在使用spark2.2d的Struct Streaming写入Hbase数据库时报错 Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
报错说是什么序列化的错误,搞得我去修改spark的配置文件.
然而真正的原因是我把创建Hbase连接的代码写在foreach里面了.
具体原因可能是因为每次遍历都创建一个hbase连接,导致每个连接之间的序列化不一样(只是个人猜测,希望有人能指正)
下面代码是错误的.创建Hbase连接的代码不应该在foreach块里.
import spark.implicits._
println("========================= 计算每个小时实时的应用的使用时间 ==================================")
println("========================= 需要输入查找天数参数======================================")
var day = "2018-03-14"
val hourData = spark
.readStream
.schema(schema)
.json("D:\\data\\behavior\\"+day+"*.log")
val appActiveTimeByHour = hourData
.withColumn("data", explode($"data"))
.select("day","endtime","data.*")
.groupBy(
window($"endtime", "10 minutes", "5 minutes"),
$"package"
)
.sum("activetime")
.writeStream
.outputMode("complete")
.foreach(
new ForeachWriter[Row]{
println("========================= 开始======================================")
// hbase的相关配置
val hbaseconf: Configuration = HBaseConfiguration.create()
hbaseconf.set("hbase.zookeeper.quorum",zookeeperservers)
hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
//hbaseconf.set("zookeeper.znode.parent","/hbase-unsecure")
var connection: Connection = ConnectionFactory.createConnection(hbaseconf)
println("========================= 结束======================================")
def open(partitionId:Long,version:Long):Boolean={
true
}
def process(record:Row):Unit={
println(record.toString)
println(record.get(0).toString)
println(record.get(1).toString)
println(record.get(2).toString)
println("========================= ggg======================================")
val conf = HBaseConfiguration.create()
val table = connection.getTable(TableName.valueOf(tablename))
val theput= new Put(Bytes.toBytes(record.get(0).toString))
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("package"),Bytes.toBytes(record.get(0).toString))
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("day"),Bytes.toBytes(record.get(0).toString))
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("endtime"),Bytes.toBytes(record.get(0).toString))
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("activetime"),Bytes.toBytes(record.get(0).toString))
table.put(theput)
}
def close(errorOrNull:Throwable):Unit={
println("存入hbase出错")
}
}
)
.queryName("HourlyUsage")
.format("foreach")
//.format("console")
//.trigger( Trigger.ProcessingTime("20 seconds"))
.start()
appActiveTimeByHour.awaitTermination()
println("=================================================================================")
}
}