spark数据批量导入hbase

时间:2024-03-08 09:22:48

参考:

https://www.jianshu.com/p/b6c5a5ba30af

 

https://www.iteblog.com/archives/1891.html

 

https://www.iteblog.com/archives/1889.html

 

 

https://www.imooc.com/article/69269

 

https://blog.csdn.net/lrxcmwy2/article/details/81592001

 

 

 

 

spark任务的实现(生成hfile和导入到hbase是在一个任务里面)

 

 

 

 

 

4. 写数据的优化:Bulk Load

以上写数据的过程将数据一条条插入到Hbase中,这种方式运行慢且在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据,解决办法就是使用 Bulk Load 方式批量导入数据。

Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。

Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase 的内部 HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

接下来介绍在spark中如何使用 Bulk Load 方式批量导入数据到 HBase 中。

package com.bonc.rdpe.spark.hbase

import com.alibaba.fastjson.JSON
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author: YangYunhe
  * Description: 
  * Create: 2018/7/24 13:14
  */
object BulkLoad {

  val zookeeperQuorum = "172.16.13.185:2181"
  val dataSourcePath = "file:///D:/data/news_profile_data.txt"
  val hdfsRootPath = "hdfs://beh/"
  val hFilePath = "hdfs://beh/test/yyh/hbase/bulkload/hfile/"
  val tableName = "news"
  val familyName = "cf1"
  val qualifierName = "title"

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.defaultFS", hdfsRootPath)
    val fileSystem = FileSystem.get(hadoopConf)
    val hbaseConf = HBaseConfiguration.create(hadoopConf)
    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
    val admin = hbaseConn.getAdmin

    // 0. 准备程序运行的环境
    // 如果 HBase 表不存在,就创建一个新表
    if (!admin.tableExists(TableName.valueOf(tableName))) {
      val desc = new HTableDescriptor(TableName.valueOf(tableName))
      val hcd = new HColumnDescriptor(familyName)
      desc.addFamily(hcd)
      admin.createTable(desc)
    }
    // 如果存放 HFile文件的路径已经存在,就删除掉
    if(fileSystem.exists(new Path(hFilePath))) {
      fileSystem.delete(new Path(hFilePath), true)
    }

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
    // java.io.IOException: Added a key not lexically larger than previous.

    val data = sc.textFile(dataSourcePath)
      .map(jsonStr => {
        // 处理数据的逻辑
        val jsonObject = JSON.parseObject(jsonStr)
        val rowkey = jsonObject.get("id").toString.trim
        val title = jsonObject.get("title").toString.trim
        (rowkey, title)
      })
      .sortByKey()
      .map(tuple => {
        val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(qualifierName), Bytes.toBytes(tuple._2))
        (new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
      })

    // 2. Save Hfiles on HDFS
    val table = hbaseConn.getTable(TableName.valueOf(tableName))
    val job = Job.getInstance(hbaseConf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat2.configureIncrementalLoadMap(job, table)

    data.saveAsNewAPIHadoopFile(
      hFilePath,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      hbaseConf
    )

    //  3. Bulk load Hfiles to Hbase
    val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

    hbaseConn.close()
    fileSystem.close()
    sc.stop()
  }
}

说明:

  • rowkey一定要进行排序
  • 上面的代码使用了saveAsNewAPIHadoopFile(),也可以使用saveAsNewAPIHadoopDataset(),把以下代码:
data.saveAsNewAPIHadoopFile(
  hFilePath,
  classOf[ImmutableBytesWritable],
  classOf[KeyValue],
  classOf[HFileOutputFormat2],
  hbaseConf
)

替换为:

job.getConfiguration.set("mapred.output.dir", hFilePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)

即可。



作者:CoderJed
链接:https://www.jianshu.com/p/b6c5a5ba30af
來源:简书