一、创建HBase的学生成绩表’test’ 、学生平均分成绩表’average_score’
首先需要将学生成绩插入到表’test’,然后程序创建一个学生平均成绩表’average_score’ 供MapReduce程序写入数据。
1. 待插入到hbase的数据
2.运行程序的hadoop指令
初始化hbase表的InitData函数在工程中的位置:
hadoop运行jar指令格式:
下面指令中,XXX是具体jar包的名字,KKK是需要运行的函数名称
hadoop jar XXX.jar KKK
具体命令:
hadoop jar hadoop_averageScore.jar hadoop_averageScore.InitData
3.查看程序执行结果
3.1 学生成绩数据插入完成后,使用hbase shell查看表 ‘test’
代码:
hbase shell
scan 'test'
结果:
3.2 查看程序运行后,hbase共有多少个数据表
代码:
hbase shell
list
结果:
二、运行计算学生成绩平均分的MapReduce程序
1. 运行MapReduce程序的指令
hadoop jar hadoop_averageScore.jar hadoop_averageScore.average_score
2. 使用hbase shell查看表 ‘average_score’
三、完整的Java工程
InitData.java负责初始化学生成绩表’test’,和创建学生平均成绩表’average_score’供MapReduce程序写入数据。
average_score.java是MapReduce程序,负责从表’test’读入学生成绩,然后计算学生的平均分,最后写入表’average_score’。
HBaseUtil.java是程序操作hbase的工具类,里面提供了多种操作hbase的操作API。
1. InitData.java
package hadoop_averageScore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.jruby.RubyBoolean.False;
public class InitData {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
//如果表"average_score"不存在,则创建表
if(admin.tableExists("average_score")==false)
{
//创建一个学生平均分表'average_score',只有一个列族avg(平均分average_score的缩写)
HTableDescriptor tableDescriptor = new HTableDescriptor("average_score");
HColumnDescriptor columnDescriptor =new HColumnDescriptor("avg");
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
System.out.println("**************************成功创建表average_score********************************");
}
//如果表"test"不存在,则创建表
if(admin.tableExists("test")==false)
{
//创建一个学生成绩表'test',有三个列族student_Name、subject、score(学生姓名,科目,分数)
HTableDescriptor tableDescriptor = new HTableDescriptor("test");
tableDescriptor.addFamily(new HColumnDescriptor("student_Name"));
tableDescriptor.addFamily(new HColumnDescriptor("subject"));
tableDescriptor.addFamily(new HColumnDescriptor("score"));
admin.createTable(tableDescriptor);
System.out.println("**************************成功创建表average_score********************************");
}
//获取学生成绩表'test'
HTable htable = HBaseUtil.getHTable("test");
htable.setAutoFlush(false);
//创建学生成绩数据
List<Put> puts = new ArrayList<Put>();
Put put1_1 = HBaseUtil.getPut("1","student_Name",null,"tom");
Put put1_2 = HBaseUtil.getPut("1","subject",null,"yuwen");
Put put1_3 = HBaseUtil.getPut("1","score",null,"80");
Put put2_1 = HBaseUtil.getPut("2","student_Name",null,"tom");
Put put2_2 = HBaseUtil.getPut("2","subject",null,"shuxue");
Put put2_3 = HBaseUtil.getPut("2","score",null,"98");
Put put3_1 = HBaseUtil.getPut("3","student_Name",null,"tom");
Put put3_2 = HBaseUtil.getPut("3","subject",null,"yingyu");
Put put3_3 = HBaseUtil.getPut("3","score",null,"89");
Put put4_1 = HBaseUtil.getPut("4","student_Name",null,"cat");
Put put4_2 = HBaseUtil.getPut("4","subject",null,"yuwen");
Put put4_3 = HBaseUtil.getPut("4","score",null,"88");
Put put5_1 = HBaseUtil.getPut("5","student_Name",null,"cat");
Put put5_2 = HBaseUtil.getPut("5","subject",null,"shuxue");
Put put5_3 = HBaseUtil.getPut("5","score",null,"99");
Put put6_1 = HBaseUtil.getPut("6","student_Name",null,"cat");
Put put6_2 = HBaseUtil.getPut("6","subject",null,"yingyu");
Put put6_3 = HBaseUtil.getPut("6","score",null,"90");
puts.add(put1_1);
puts.add(put1_2);
puts.add(put1_3);
puts.add(put2_1);
puts.add(put2_2);
puts.add(put2_3);
puts.add(put3_1);
puts.add(put3_2);
puts.add(put3_3);
puts.add(put4_1);
puts.add(put4_2);
puts.add(put4_3);
puts.add(put5_1);
puts.add(put5_2);
puts.add(put5_3);
puts.add(put6_1);
puts.add(put6_2);
puts.add(put6_3);
//将学生成绩数据插入到表'test'
htable.put(puts);
htable.flushCommits();
htable.close();
System.out.println("**************************向表'test'插入数据成功********************************");
}
}
2. average_score.java
package hadoop_averageScore;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class average_score {
/** * TableMapper<Text,IntWritable> Text:输出的key类型,IntWritable:输出的value类型 */
public static class MyMapper extends TableMapper<Text,Text>{
private static Text one = new Text();
private static Text word = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
//获取每一行的列score的值
byte[] score =value.getValue(Bytes.toBytes("score"), null);
byte [] student_Name =value.getValue(Bytes.toBytes("student_Name"), null);
word.set(student_Name);
one.set(score);
context.write(word, one);
}
}
/** * TableReducer<Text,IntWritable> Text:输入的key类型,Text:输入的value类型,ImmutableBytesWritable:输出类型 */
public static class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
//定义科目数量
int courseCount = 0;
//定义中成绩
int sum = 0;
//定义平均分
float average = 0;
for(Text val:values) {
sum+=Integer.parseInt(val.toString());
courseCount ++;
}
//求平均成绩
average = sum / courseCount;
//添加一行记录,每一个单词作为行键
Put put = new Put(Bytes.toBytes(key.toString()));
//在列族result中添加一个标识符num,赋值为每个单词出现的次数
//String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式
//然后再转二进制存到hbase。
put.add(Bytes.toBytes("avg"), null, Bytes.toBytes(String.valueOf(average)));
context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf);
job.setJarByClass(average_score.class);
Scan scan = new Scan();
//指定要查询的列族
// scan.addColumn(Bytes.toBytes("score"),null);
//指定Mapper读取的表为word
TableMapReduceUtil.initTableMapperJob("test", scan, MyMapper.class, Text.class, Text.class, job);
//指定Reducer写入的表为stat
TableMapReduceUtil.initTableReducerJob("average_score", MyReducer.class, job);
System.exit(job.waitForCompletion(true)?0:1);
}
}
3. HBaseUtil.java
package hadoop_averageScore;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseUtil {
/** * 初始化HBase的配置文件 * @return */
public static Configuration getConfiguration(){
Configuration conf = HBaseConfiguration.create();
//和hbase-site.xml中配置的一致
conf.set("hbase.zooker.quorum", "h1,h2,h3");
return conf;
}
/** * 实例化HBaseAdmin,HBaseAdmin用于对表的元素据进行操作 * @return * @throws IOException */
public static HBaseAdmin getHBaseAdmin() throws IOException{
return new HBaseAdmin(getConfiguration());
}
/** * 创建表 * @param tableName 表名 * @param columnFamilies 列族 * @throws IOException */
@SuppressWarnings("deprecation")
public static void createTable(String tableName,String...columnFamilies) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());//
for(String fc : columnFamilies) {
htd.addFamily(new HColumnDescriptor(fc));
}
getHBaseAdmin().createTable(htd);
}
/** * 获取HTableDescriptor * @param tableName * @return * @throws IOException */
public static HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException{
return getHBaseAdmin().getTableDescriptor(tableName);
}
/** * 获取表 * @param tableName 表名 * @return * @throws IOException */
public static HTable getHTable(String tableName) throws IOException{
return new HTable(getConfiguration(),tableName);
}
/** * 获取Put,Put是插入一行数据的封装格式 * @param tableName * @param row * @param columnFamily * @param qualifier * @param value * @return * @throws IOException */
public static Put getPut(String row,String columnFamily,String qualifier,String value) throws IOException{
Put put = new Put(row.getBytes());
if(qualifier==null||"".equals(qualifier)) {
put.add(columnFamily.getBytes(), null, value.getBytes());
}else {
put.add(columnFamily.getBytes(), qualifier.getBytes(), value.getBytes());
}
return put;
}
/** * 查询某一行的数据 * @param tableName 表名 * @param row 行键 * @return * @throws IOException */
public static Result getResult(String tableName,String row) throws IOException {
Get get = new Get(row.getBytes());
HTable htable = getHTable(tableName);
Result result = htable.get(get);
htable.close();
return result;
}
/** * 条件查询 * @param tableName 表名 * @param columnFamily 列族 * @param queryCondition 查询条件值 * @param begin 查询的起始行 * @param end 查询的终止行 * @return * @throws IOException */
public static ResultScanner getResultScanner(String tableName,String columnFamily,String queryCondition,String begin,String end) throws IOException{
Scan scan = new Scan();
//设置起始行
scan.setStartRow(Bytes.toBytes(begin));
//设置终止行
scan.setStopRow(Bytes.toBytes(end));
//指定要查询的列族
scan.addColumn(Bytes.toBytes(columnFamily),null);
//查询列族中值等于queryCondition的记录
Filter filter1 = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),null,CompareOp.EQUAL,Bytes.toBytes(queryCondition));
//Filter filter2 = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),null,CompareOp.EQUAL,Bytes.toBytes("chuliuxiang"));
FilterList filterList = new FilterList(Operator.MUST_PASS_ONE,Arrays.asList(filter1));
scan.setFilter(filterList);
HTable htable = getHTable(tableName);
ResultScanner rs = htable.getScanner(scan);
htable.close();
return rs;
}
}