hbase运行mapreduce设置及基本数据加载方法

时间:2021-09-04 18:44:59

hbase与mapreduce集成后,运行mapreduce程序,同时需要mapreduce jar和hbase jar文件的支持,这时我们需要通过特殊设置使任务可以同时读取到hadoop jar和hbase jar文件内容,否则任务会报错。

我们知道仅仅运行mapreduce任务时,不需要设置classpath,这时因为运行bin/yarn命令时已经在命令脚本中针对hadoop执行jar包路径进行了预设置的缘故,但是bin/yarn不能自动设置hbase可执行jar路径,这也是情理之中的事。

一、mapreduce运行hbase程序方法(需要设置环境变量,否则会报错):

1、如果直接通过mapreduce去运行hbase程序,会报错找不到类:

$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/yarn jar /opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6/lib/hbase-server-0.98.6-cdh5.3.6.jar

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/filter/Filter
     at java.lang.Class.getDeclaredMethods0(Native Method)
     at java.lang.Class.privateGetDeclaredMethods(Class.java:2570)
     at java.lang.Class.getMethod0(Class.java:2813)
     at java.lang.Class.getMethod(Class.java:1663)
     at org.apache.hadoop.util.ProgramDriver$ProgramDescription.<init>(ProgramDriver.java:60)
     at org.apache.hadoop.util.ProgramDriver.addClass(ProgramDriver.java:104)
     at org.apache.hadoop.hbase.mapreduce.Driver.main(Driver.java:39)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:606)
     at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.filter.Filter
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
     at java.security.AccessController.doPrivileged(Native Method)
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
     ... 12 more
    

2、要想执行这个程序,需要设置classpath,设置方法如下:

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar

--执行任务如下:

$ export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

$ export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

2017-07-02 15:56:56,424 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

An example program must be given as the first argument.

Valid program names are:
   CellCounter: Count cells in HBase table
   completebulkload: Complete a bulk data load.
   copytable: Export a table from local cluster to peer cluster
   export: Write table data to HDFS.
   import: Import data written by Export.
   importtsv: Import data in TSV format.
   rowcounter: Count rows in HBase table
   verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

--根据输出提示,可以得到hbase-server-0.98.6-cdh5.3.6.jar包提供的功能如下:
   CellCounter: Count cells in HBase table
   completebulkload: Complete a bulk data load.
   copytable: Export a table from local cluster to peer cluster
   export: Write table data to HDFS.
   import: Import data written by Export.
   importtsv: Import data in TSV format.
   rowcounter: Count rows in HBase table
   verifyrep:Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

现在执行一个hbase程序试试看-统计表中条目数:

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter user

二、hbase数据加载方式:

向hbase中加载数据,一般数据来源三种:
     log
     rdbms
     爬虫

1、测试数据:

student.tsv

10001    zhangsan    35    male    beijing    0109876543

10002    lisi    32    male    shanghia    0109876563

10003    zhaoliu    35    female    hangzhou    01098346543

10004    qianqi    35    male    shenzhen    01098732543

2、上传文件到hdfs上:

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/importtsv

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/student.tsv /user/hadoop/hbase/importtsv

3、hbase中创建student表:

create 'student','info'

4、将数据导入hbase的脚本程序:

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,\

info:name,info:age,info:sex,info:address,info:phone \

student \

hdfs://chavin.king:9000/user/hadoop/hbase/importtsv

--注意:

通常mapreduce在写hbase时使用的事tableOutputFormat方式,在reduce中直接生成put对象写入hbase,该方式在大数据量写入时效率低下(hbase会block写入,频繁进行flush,split,compact等大量io操作),并对hbase节点稳定性造成一定的影响(GC时间过长,相应缓慢,导致节点超市退出,并引起一系列连锁反应)。

5、bulk load方式导入数据到hbase中:

1)创建hbase中student2表:

create 'student2','info'

2)通过以下脚本生成hfile文件:

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,\

info:name,info:age,info:sex,info:address,info:phone \

-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/hfileoutput \

student2 \

hdfs://chavin.king:9000/user/hadoop/hbase/importtsv

--这里首先指定了参数-Dimporttsv.bulk.output,这时上述任务首先将目标文件转换为hfile格式文件,但并不马上导入到目标表中。

3)bulk load方式导入数据进入hbase student2表:

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \

completebulkload \

hdfs://chavin.king:9000/user/hadoop/hbase/hfileoutput \

student2

此步骤通过参数completebulkload直接移动步骤2生成的hfile文件到目标表路径,加快了数据加载的速度,同时提升了job运行稳定性。

--说明:

hbase支持bulk load的入库方式,即上述处理方式,它利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在hdfs中生成持久化的hfile格式文件,然后上传至合适位置,即完成海量数据快速入库的办法。配合mapreduce完成,高效快捷,而且不占用hregion资源,增添负载,在大数据量写入时能极大的提高写入效率,并减低对hbase节点的写入压力。

通过生成hfile,然后再bulkload到hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下好处:

a)消除了对hbase集群插入压力

b)提高了job的运行速度,降低了job执行时间。

三、加载oracle经典测试表dept和emp到hbase中:

1、测试数据如下:

dept.tsv

10    ACCOUNTING    NEW YORK

20    RESEARCH    DALLAS

30    SALES    CHICAGO

40    OPERATIONS    BOSTON

emp.tsv

7369    SMITH    CLERK    7902    1980-12-17    800.00        20

7499    ALLEN    SALESMAN    7698    1981-02-20    1600.00    300.00    30

7521    WARD    SALESMAN    7698    1981-02-22    1250.00    500.00    30

7566    JONES    MANAGER    7839    1981-04-02    2975.00        20

7654    MARTIN    SALESMAN    7698    1981-09-28    1250.00    1400.00    30

7698    BLAKE    MANAGER    7839    1981-05-01    2850.00        30

7782    CLARK    MANAGER    7839    1981-06-09    2450.00        10

7788    SCOTT    ANALYST    7566    1987-04-19    3000.00        20

7839    KING    PRESIDENT        1981-11-17    5000.00        10

7844    TURNER    SALESMAN    7698    1981-09-08    1500.00    0.00    30

7876    ADAMS    CLERK    7788    1987-05-23    1100.00        20

7900    JAMES    CLERK    7698    1981-12-03    950.00        30

7902    FORD    ANALYST    7566    1981-12-03    3000.00        20

7934    MILLER    CLERK    7782    1982-01-23    1300.00        10

2、上传表到hdfs上

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/scott/dept

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/dept.tsv /user/hadoop/hbase/scott/dept

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -mkdir -p /user/hadoop/hbase/scott/emp

/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put /opt/datas/emp.tsv /user/hadoop/hbase/scott/emp

3、hbase中创建dept表和emp表

hbase(main):042:0* create 'dept','info'

0 row(s) in 0.5810 seconds

=> Hbase::Table - dept

hbase(main):043:0> create 'emp','info'

0 row(s) in 0.2290 seconds

4、通过以下脚本转换dept.tsv和emp.tsv文件为hfile格式文件:

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,\

info:dname,info:loc \

-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/deptfile \

dept \

hdfs://chavin.king:9000/user/hadoop/hbase/scott/dept

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,\

info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno \

-Dimporttsv.bulk.output=hdfs://chavin.king:9000/user/hadoop/hbase/empfile \

emp \

hdfs://chavin.king:9000/user/hadoop/hbase/scott/emp

5、通过以下脚本将步骤4产生文件导入到目标表

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \

completebulkload \

hdfs://chavin.king:9000/user/hadoop/hbase/deptfile \

dept

export HBASE_HOME=/opt/cdh-5.3.6/hbase-0.98.6-cdh5.3.6

export HADOOP_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
         ${HADOOP_HOME}/bin/yarn jar \

${HBASE_HOME}/lib/hbase-server-0.98.6-cdh5.3.6.jar \

completebulkload \

hdfs://chavin.king:9000/user/hadoop/hbase/empfile \

emp