hbase与mapreduce集成后,运行mapreduce程序,同时需要mapreduce jar和hbase jar文件的支持,这时我们需要通过特殊设置使任务可以同时读取到hadoop jar和hbase jar文件内容,否则任务会报错。
我们知道仅仅运行mapreduce任务时,不需要设置classpath,这时因为运行bin/yarn命令时已经在命令脚本中针对hadoop执行jar包路径进行了预设置的缘故,但是bin/yarn不能自动设置hbase可执行jar路径,这也是情理之中的事。
一、mapreduce运行hbase程序方法(需要设置环境变量,否则会报错):
1、如果直接通过mapreduce去运行hbase程序,会报错找不到类:
$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6/lib/hbase-server-0.98.6-cdh5.3.6.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/filter/Filter
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2570)
at java.lang.Class.getMethod0(Class.java:2813)
at java.lang.Class.getMethod(Class.java:1663)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.<init>(ProgramDriver.java:60)
at org.apache.hadoop.util.ProgramDriver.addClass(ProgramDriver.java:104)
at org.apache.hadoop.hbase.mapreduce.Driver.main(Driver.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.filter.Filter
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 12 more
2、要想执行这个程序,需要设置classpath,设置方法如下:
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar
--执行任务如下:
$ export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
$ export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2017-07-02 15:56:56,424 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
--根据输出提示,可以得到hbase-server-0.98.6-cdh5.3.6.jar包提供的功能如下:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep:Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
现在执行一个hbase程序试试看-统计表中条目数:
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter user
二、hbase数据加载方式:
向hbase中加载数据,一般数据来源三种:
log
rdbms
爬虫
1、测试数据:
student.tsv
10001 zhangsan 35 male beijing 0109876543
10002 lisi 32 male shanghia 0109876563
10003 zhaoliu 35 female hangzhou 01098346543
10004 qianqi 35 male shenzhen 01098732543
2、上传文件到hdfs上:
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/importtsv
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/student.tsv /user/hadoop/hbase/importtsv
3、hbase中创建student表:
create 'student','info'
4、将数据导入hbase的脚本程序:
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
student \
hdfs://chavin.king:9000/user/hadoop/hbase/importtsv
--注意:
通常mapreduce在写hbase时使用的事tableOutputFormat方式,在reduce中直接生成put对象写入hbase,该方式在大数据量写入时效率低下(hbase会block写入,频繁进行flush,split,compact等大量io操作),并对hbase节点稳定性造成一定的影响(GC时间过长,相应缓慢,导致节点超市退出,并引起一系列连锁反应)。
5、bulk load方式导入数据到hbase中:
1)创建hbase中student2表:
create 'student2','info'
2)通过以下脚本生成hfile文件:
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:name,info:age,info:sex,info:address,info:phone \
-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/hfileoutput \
student2 \
hdfs://chavin.king:9000/user/hadoop/hbase/importtsv
--这里首先指定了参数-Dimporttsv.bulk.output,这时上述任务首先将目标文件转换为hfile格式文件,但并不马上导入到目标表中。
3)bulk load方式导入数据进入hbase student2表:
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \
completebulkload \
hdfs://chavin.king:9000/user/hadoop/hbase/hfileoutput \
student2
此步骤通过参数completebulkload直接移动步骤2生成的hfile文件到目标表路径,加快了数据加载的速度,同时提升了job运行稳定性。
--说明:
hbase支持bulk load的入库方式,即上述处理方式,它利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在hdfs中生成持久化的hfile格式文件,然后上传至合适位置,即完成海量数据快速入库的办法。配合mapreduce完成,高效快捷,而且不占用hregion资源,增添负载,在大数据量写入时能极大的提高写入效率,并减低对hbase节点的写入压力。
通过生成hfile,然后再bulkload到hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下好处:
a)消除了对hbase集群插入压力
b)提高了job的运行速度,降低了job执行时间。
三、加载oracle经典测试表dept和emp到hbase中:
1、测试数据如下:
dept.tsv
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
emp.tsv
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-02-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-02-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-04-02 2975.00 20
7654 MARTIN SALESMAN 7698 1981-09-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-05-01 2850.00 30
7782 CLARK MANAGER 7839 1981-06-09 2450.00 10
7788 SCOTT ANALYST 7566 1987-04-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-09-08 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-05-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-03 950.00 30
7902 FORD ANALYST 7566 1981-12-03 3000.00 20
7934 MILLER CLERK 7782 1982-01-23 1300.00 10
2、上传表到hdfs上
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/scott/dept
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/dept.tsv /user/hadoop/hbase/scott/dept
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/scott/emp
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/emp.tsv /user/hadoop/hbase/scott/emp
3、hbase中创建dept表和emp表
hbase(main):042:0* create 'dept','info'
0 row(s) in 0.5810 seconds
=> Hbase::Table - dept
hbase(main):043:0> create 'emp','info'
0 row(s) in 0.2290 seconds
4、通过以下脚本转换dept.tsv和emp.tsv文件为hfile格式文件:
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:dname,info:loc \
-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/deptfile \
dept \
hdfs://chavin.king:9000/user/hadoop/hbase/scott/dept
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,\
info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno \
-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/empfile \
emp \
hdfs://chavin.king:9000/user/hadoop/hbase/scott/emp
5、通过以下脚本将步骤4产生文件导入到目标表
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \
completebulkload \
hdfs://chavin.king:9000/user/hadoop/hbase/deptfile \
dept
export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6
export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
${HADOOP_HOME}/bin/yarn jar \
${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \
completebulkload \
hdfs://chavin.king:9000/user/hadoop/hbase/empfile \
emp