在MapReduce中连接Hbase数据库

时间:2022-07-26 18:23:39

一、实验题目

在MapReduce中连接Hbase数据库

二、实验目的

编写一个MapReduce程序,该程序可以处理Hbase中的数据。 在Eclipse上运行该程序。

三、实验步骤

1.启动Eclipse,创建Java Project命名为hbase1. 2.添加项目所需库文件,一开始我是按照文档给的添加,但是运行时总是报错,找不到main,后来发现是jar包添加少了。 保险起见我将hbase/lib中的所有jar文件都添加到了其中,如图: 在MapReduce中连接Hbase数据库
3.编写mapper,reducer,driver。 mapper代码如下:
package com.hbasepackage;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.Result;
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.primitives.*;

//import com.yammer.metrics.core.HealthCheck.Result;
@SuppressWarnings("unused")
public class testMapper extends TableMapper<Text, IntWritable> {
public void map(ImmutableBytesWritable rowKey, Result columns,
Context context) throws IOException, InterruptedException {
try {
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format fi rst and then convert it to
// string(as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"),
Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new Text(oKey), new IntWritable(sales));
} catch (RuntimeException e) {
e.printStackTrace();
}
}
}
reducer代码:
package com.hbasepackage;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.primitives.*;

@SuppressWarnings("unused")
public class testReducer extends
TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
try {
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values) {
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
String keyString = key.toString();
System.out.println("" + keyString + "\t" + sum);
// create hbase put with rowkey as date
Put insHBase = new Put(key.getBytes());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("Total sales:"),
Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
}
driver代码:
package com.hbasepackage;

import java.io.IOException;
@SuppressWarnings("unused")
public class testDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// defi ne scan and defi ne column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
Job job = new Job(conf);
job.setJarByClass(testDriver.class);
job.setMapperClass(testMapper.class);
job.setReducerClass(testReducer.class);
// defi ne input hbase table
TableMapReduceUtil.initTableMapperJob(
"EMPLOYEE",
scan,
testMapper.class,
Text.class,
IntWritable.class,
job);
// defi ne output table
TableMapReduceUtil.initTableReducerJob("TotalSale", testReducer.class,
job);
job.waitForCompletion(true);
}
}
4.启动hadoop及hbase

在MapReduce中连接Hbase数据库
5.打开hbase shell,创建表,并添加信息。
在MapReduce中连接Hbase数据库 6.运行程序,查看结果。

四、实验结果

eclipse中的结果:在MapReduce中连接Hbase数据库
进入hbase shell查看结果:在MapReduce中连接Hbase数据库

五、总结

文档又一次出问题了,不过遇到问题不慌,多查查资料,总会解决。