Spark Streaming实时写入数据到HBase

时间:2023-01-15 20:46:34

一、概述

  在实时应用之中,难免会遇到往NoSql数据如HBase中写入数据的情景。题主在工作中遇到如下情景,需要实时查询某个设备ID对应的账号ID数量。踩过的坑也挺多,举其中之一,如一开始选择使用NEO4J图数据库存储设备和账号的关系,当然也有其他的数据,最终构成一个复杂的图关系,但是这个图数据库免费版是单机安装(集群要收费),在实时写入和查询关系的时候,导致我们一台服务器内存和cpu损耗严重,为了保证Hadoop集群的稳定性,只好替换掉这个数据库,采用流行的HBase。本文就HBase的使用心得做如下记录。

二、解决方案

  1.rowkey设计:设备id是32位字母、数字组成的串,考虑到HBase长表扫描的查询最快,所以rowkey的设计方式为,设备ID+账号ID拼接而成,这样在扫描某个设备ID时会很快计算出条数。

2.HBase表设计:在创建表的时候采用预分区建表,因为这样的,如果知道hbase数据表的rowkey的分布情况,就可以在建表的时候对hbase进行region的预分区,这样做的好处是防止大数据量插入的热点问题,提高数据插入的效率。rowkey是字母或者数字开头,所以建表语句如下(数据量再大的时候还可以在细分分区):

create 'T_TEST', 'data', SPLITS => ['0', '1','2', '3','4', '5','6','7','8','9','a', 'b', 'c', 'd', 'e', 'f', 'g']

此处入坑:创建表的时候将HBase表映射到Hive外部表,语句如下。这样做是为了方便导入历史数据,但是Hive跑批将历史数据导入之后,从HBase查询已经导入的某一数据的时候,无法查询导数据,也无法通过API写入到HBase,这个问题很诡异,后来想了下Hive导入的数据编码和HBase的不同,于是重新将表删除,不采用映射表,直接使用Spark将历史数据导入,问题解决。

CREATE external TABLE tmp.H_T_TEST(key string ,num string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,data:num")
TBLPROPERTIES ("hbase.table.name" = "T_TEST");

3.设计好rowkey和表之后,我们就开始写Spark代码了。

此处入坑,我把HBase的连接池写在了和Spark的同一位置,这样会遇到一个问题,Spark程序运行的时候报HBaseConnection没有序列化,按照网上的做法,将对象加上 @transient注解,虽然不报错误,还是无法将数据写入到Hba之中。后来经过查找,找到了解决办法,将HBase的连接放到消息的循环之内,即一个分区建立一个HBase连接,代码如下。

def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.createSparkContext(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
    val messages = SparkUtil.createDStreamFromKafka(
      "T_TEST",
      topicSet,
      ssc)//创建消息接收器

    messages.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {//循环分区
        try {
          val connection = HBaseUtil.getHbaseConn //获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化
          partitionRecords.foreach(s => {
            val data = JSON.parseObject(s._2)//将数据转化成JSON格式
            val tableName = TableName.valueOf("T_TEST")
            val table = connection.getTable(tableName)//获取表连接

            val put = new Put(Bytes.toBytes(data.getString("id1") + "_" + data.getString("id2")))
            put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("num"), Bytes.toBytes("1"))

            Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
            table.close()//分区数据写入HBase后关闭连接
          })
        } catch {
          case e: Exception => logger.error("写入HBase失败,{}", e.getMessage)
        }
      })
    })
    ssc.start()
    ssc.awaitTermination()

  }

至此问题解决,数据正常,还没出现过问题,等待时间验证吧。

4.历史数据导入,在导入历史数据的时候,由于数据放在了Hive的两个不同表之中,一开始想要一次性读入,使用Spark SQL的dataframe,创建一个hivecontext,写HiveSQL将两个表结果执行union all操作,但是Spark程序报rpc错误。将两个表的结果分别查出,使用dataframe 的union all操作,也是不行,也是rpc错误,查了很多资料,还是没解决,莫名其妙的错误,后来两个表分开执行导入历史数据,问题不再出现,可能Spark还是不够成熟,总是遇到莫名其妙的问题。

三、总结

  在使用Hbase的时候要预分区。不要为了方便使用Hive外部映射表。HBase的连接池要放在分区循环开始的地方,不然创建很多的连接,会导致HBase垮掉。