HBase with MapReduce (SummaryToFile)

时间:2023-03-08 16:33:37

上一篇文章是实现统计hbase单元值出现的个数,并将结果存放到hbase的表中,本文是将结果存放到hdfs上。其中的map实现与前文一直,连接:http://www.cnblogs.com/ljy2013/p/4820056.html,下面主要介绍一下reduce的实现:

(1)reduce的实现

package com.datacenter.HbaseMapReduce.SummaryToFile;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class SummaryToFileReducer extends
Reducer<Text, IntWritable, Text, IntWritable> { @Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
} }

(2)主类的实现也有些不同

package com.datacenter.HbaseMapReduce.SummaryToFile;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SummaryToFilemain {
static String rootdir = "hdfs://hadoop3:8020/hbase";
static String zkServer = "hadoop3";
static String port = "2181"; private static Configuration conf;
private static HConnection hConn = null; public static void HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port); try {
hConn = HConnectionManager.createConnection(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
HbaseUtil(rootdir, zkServer, port); Job job = new Job(conf,"ExampleSummaryToFile");
job.setJarByClass(SummaryToFilemain.class); // class that contains mapper and reducer Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs TableMapReduceUtil.initTableMapperJob(
"test", // input table
scan, // Scan instance to control CF and attribute selection
SummaryMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(SummaryToFileReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop3:8020/user/liujiyu/score-test")); // adjust directories as required boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
} }