spark streaming插入hbase

时间:2022-01-23 14:46:41

import java.sql.{DriverManager, ResultSet}

import org.apache.spark._
import org.apache.spark.streaming._ import scala.util.Random import org.apache.hadoop.hbase.{HTableDescriptor,HColumnDescriptor,HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put,Table} object Pi { val user="root"
val password = "root"
val host="10.8.8.123"
val database="db_1"
val port=3306
val conn_str = "jdbc:mysql://"+host +":"+port+"/"+database val tablename="achi"
val cf="a"
val qulified="name" def CreatTableIfNotFind(conn:Connection,userTable:TableName): Unit ={
//从Connection获得 Admin 对象(相当于以前的 HAdmin)
val admin=conn.getAdmin if(admin.tableExists(userTable)){
println("Table exists!")
//admin.disableTable(userTable)
//admin.deleteTable(userTable)
//exit()
}else{
val tableDesc=new HTableDescriptor(userTable)
tableDesc.addFamily(new HColumnDescriptor(cf.getBytes))
admin.createTable(tableDesc)
println("Create table success!")
}
} def InsertHbase(table:Table,cf:String,qulified:String,value:String): Unit ={
val p=new Put("id001".getBytes())
p.addColumn(cf.getBytes,qulified.getBytes,value.getBytes)
table.put(p)
} def main(args: Array[String]) {
val conf=new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(3)) val lines=ssc.socketTextStream("localhost",9999)
val words=lines.map(_.split('|')) words.print() words.foreachRDD{
rdd=>rdd.foreachPartition{
pa=>
val conf=HBaseConfiguration.create()
val conn=ConnectionFactory.createConnection(conf)
val userTable=TableName.valueOf(tablename)
val table=conn.getTable(userTable) pa.foreach{
w=>
try{
var beg = System.currentTimeMillis()
println(w(0)+w(1))
InsertHbase(table,cf,w(0),w(1))
println("***************************************************************")
println(" 耗时: " + (System.currentTimeMillis() - beg)+"ms")
println("***************************************************************")
}catch{
case _:Exception=>println("raw error!")
}
}
table.close()
conn.close() }
} ssc.start()
ssc.awaitTermination() /*
Class.forName("com.mysql.jdbc.Driver").newInstance();
val conn1 = DriverManager.getConnection(conn_str,user,password) try {
val statement = conn1.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = statement.executeQuery("select * from achi limit 10")
while (rs.next) {
println(rs.getString(1))
}
}
catch {
case _ : Exception => println("===>")
}
finally {
conn1.close
} */
}
}
name := "untitled"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies++= Seq(
"mysql" % "mysql-connector-java" % "5.1.38",
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.spark" %% "spark-streaming" % "1.5.2",
"org.apache.hbase" % "hbase-client" % "1.1.3",
"org.apache.hbase" % "hbase-common" % "1.1.3",
"org.apache.hbase" % "hbase-server" % "1.1.3"
) resolvers+="OS China" at "http://maven.oschina.net/content/groups/public/"