需求背景
现有需求,需要采用spark查询hbase数据库的数据同步到中间分析库,记录spark集成hbase的简单例子代码
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
object ReadHBaseData {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("ReadHBaseData")
.master("local")
.getOrCreate()
// 创建HBase配置
val conf = HBaseConfiguration.create()
// 设置HBase连接参数
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
// 创建HBase连接
val connection = ConnectionFactory.createConnection(conf)
// 创建HBase表
val tableName = "my_table"
val table = connection.getTable(TableName.valueOf(tableName))
// 创建HBase扫描对象
val scan = new Scan()
// 设置要读取的列族和列
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"))
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column2"))
// 执行HBase扫描
val scanner = table.getScanner(scan)
// 遍历扫描结果并将结果转换为RDD
val rdd = spark.sparkContext.parallelize(scanner.iterator().asScala.map(result => {
val rowKey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1")))
val value2 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column2")))
(rowKey, value1, value2)
}).toList)
// 将RDD转换为DataFrame
val df = spark.createDataFrame(rdd).toDF("rowKey", "value1", "value2")
// 显示DataFrame内容
df.show()
// 关闭HBase连接
scanner.close()
table.close()
connection.close()
// 关闭SparkSession
spark.stop()
}
}