hadoop2的mapreduce操作hbase数据

时间:2022-09-28 21:43:11

1、从hbase中取数据,再把计算结果插入hbase中

package com.yeliang;
import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /**
* 从hbase里取数据,分析完成后插入到hbase里
* @author liang.ye
*
*/
public class FamilyHBase { public static class Map extends TableMapper<Text, IntWritable>{ @Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
for (Cell cell : value.rawCells())
{
if(new String(CellUtil.cloneQualifier(cell)).equals("GroupID")){
context.write(new Text(new String(CellUtil.cloneValue(cell))), new IntWritable(1));
}
}
}
} public static class Reduce extends
TableReducer<Text, IntWritable, NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
byte[] keyBytes = Bytes.toBytes(key.toString());
if(keyBytes.length>0){
Put put = new Put(keyBytes);
// Put实例化,每一个词存一行
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
// 列族为content,列为count,列值为数目
context.write(NullWritable.get(), put);
}
}
} public static void createHBaseTable(String tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table exists, trying to recreate table......");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" + tableName);
admin.createTable(htd);
} public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
String tableName = "family_sum_by_groupid";
Configuration conf = HBaseConfiguration.create();
//conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
createHBaseTable(tableName);
Job job = new Job(conf, "family_sum_by_groupid ");
job.setJarByClass(FamilyHBase.class);
Scan scan = new Scan();
scan.addFamily("cf".getBytes());
TableMapReduceUtil.initTableMapperJob("family3", scan, Map.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob(tableName, Reduce.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

  2、从hdfs中取数据,把计算的结果插入到hdfs中

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; /**
* 从hdfs中取数分析,然后插入到hbase里
* @author liang.ye
*
*/
public class WordCountHBase { public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable i = new IntWritable(1); public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s[] = value.toString().trim().split(" ");
// 将输入的每行以空格分开
for (String m : s) {
context.write(new Text(m), i);
}
}
} public static class Reduce extends
TableReducer<Text, IntWritable, NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
byte[] keyBytes = Bytes.toBytes(key.toString());
if(keyBytes.length>0){
Put put = new Put(keyBytes);
// Put实例化,每一个词存一行
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
// 列族为content,列为count,列值为数目
context.write(NullWritable.get(), put);
}
}
} public static void createHBaseTable(String tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.56.101,192.168.56.102,192.168.56.103");
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table exists, trying to recreate table......");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" + tableName);
admin.createTable(htd);
} public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
String tableName = "WordCount2";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
createHBaseTable(tableName);
String input = args[0];
Job job = new Job(conf, "WordCount table with " + input);
job.setJarByClass(WordCountHBase.class);
job.setNumReduceTasks(3);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

  

hadoop2的mapreduce操作hbase数据的更多相关文章

  1. Mapreduce操作HBase

    这个操作和普通的Mapreduce还不太一样,比如普通的Mapreduce输入可以是txt文件等,Mapreduce可以直接读取Hive中的表的数据(能够看见是以类似txt文件形式),但Mapredu ...

  2. Hadoop生态圈-使用MapReduce处理HBase数据

    Hadoop生态圈-使用MapReduce处理HBase数据 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.对HBase表中数据进行单词统计(TableInputFormat) ...

  3. HBase 相关API操练&lpar;三&rpar;:MapReduce操作HBase

    MapReduce 操作 HBase 在 HBase 系统上运行批处理运算,最方便和实用的模型依然是 MapReduce,如下图所示. HBase Table 和 Region 的关系类似 HDFS ...

  4. 7&period;MapReduce操作Hbase

    7 HBase的MapReduce   HBase中Table和Region的关系,有些类似HDFS中File和Block的关系.由于HBase提供了配套的与MapReduce进行交互的API如 Ta ...

  5. Hbase第五章 MapReduce操作HBase

    容易遇到的坑: 当用mapReducer操作HBase时,运行jar包的过程中如果遇到 java.lang.NoClassDefFoundError 类似的错误时,一般是由于hadoop环境没有hba ...

  6. Hbase理论&amp&semi;&amp&semi;hbase shell&amp&semi;&amp&semi;python操作hbase&amp&semi;&amp&semi;python通过mapreduce操作hbase

    一.Hbase搭建: 二.理论知识介绍: 1Hbase介绍: Hbase是分布式.面向列的开源数据库(其实准确的说是面向列族).HDFS为Hbase提供可靠的底层数据存储服务,MapReduce为Hb ...

  7. HBase学习之路 (五)MapReduce操作Hbase

    MapReduce从HDFS读取数据存储到HBase中 现有HDFS中有一个student.txt文件,格式如下 95002,刘晨,女,19,IS 95017,王风娟,女,18,IS 95018,王一 ...

  8. 使用MapReduce读取HBase数据存储到MySQL

    Mapper读取HBase数据 package MapReduce; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hba ...

  9. MapReduce操作Hbase --table2file

    官方手册:http://hbase.apache.org/book.html#mapreduce.example 简单的操作,将hbase表中的数据写入到文件中. RunJob 源码: import ...

随机推荐

  1. 介绍DSA数字签名,非对称加密的另一种实现

    接下来我们介绍DSA数字签名,非对称加密的另一种实现. DSA DSA-Digital Signature Algorithm 是Schnorr和ElGamal签名算法的变种,被美国NIST作为DSS ...

  2. django向数据库添加数据

    url.py views.py host.html (样式) (展示部分) (添加信息界面) (js部分) 展示添加数据:

  3. XML于JSON

    XML:可拓展的标记语言(跨平台数据表现)用于保存数据 XML:标记需要关闭 :单根性 .NET中DOM常用对象: XmlDocument :一个XML文档 XmlNode:xml中的单个节点 Xml ...

  4. win7下loadrunner创建mysql数据库参数化问题解决

    问题现象: 安装mysql数据源驱动后,lr创建mysql驱动程序列表没有安装的驱动程序: 安装完mysql ODBC数据源后 2.在控制面板-数据源(ODBC) 3.创建mysql数据源: 4.从l ...

  5. 【开源项目4】Android ExpandableListView

    如果你对Android提供的Android ExpandableListView并不满意,一心想要实现诸如Spotify应用那般的效果,那么SlideExpandableListView绝对是你最好的 ...

  6. 电脑键盘上的F键有什么用 电脑F键功能讲解

    接触电脑这么多年了,F1到F12这几个键你真的会用吗?电脑键盘上的F键有什么用?你了解过吗?这里带来电脑F键功能讲解,一起来看看. F1:帮助 在程序里或者资源管理器界面,按F1会弹出帮助按钮. F2 ...

  7. (转)C&num; 中使用分布式缓存系统Memcached

    转自:http://blog.csdn.net/devgis/article/details/8212917 缘起: 在数据驱动的web开发中,经常要重复从数据库中取出相同的数据,这种重复极大的增加了 ...

  8. Sql Server 中如果使用TransactionScope开启一个分布式事务,使用该事务两个并发的连接会互相死锁吗

    提问: 如果使用TransactionScope开启一个分布式事务,使用该事务两个并发的连接会互相死锁吗? 如果在.Net中用TransactionScope开启一个事务. 然后在该事务范围内启动两个 ...

  9. Sublime Text2&sol;3怎样在Mac OSX中配置CTags插件

    参考地址: http://jingyan.baidu.com/article/48206aeafba820216ad6b3f5.html

  10. java 代理设计模式

    首先代理(deleration)是什么,在日常生活中我们有很多这种的例子,比如你上个QQ,各种空间被什么代理刷屏,对的,代理不是生产产品的商家,也不是进购产品的卖家,他们只是帮别人卖东西,这就相当于一 ...