问题: Spark-Streaming获取applicationId即scala中获取appid
解决方法:通过SparkListener方法来获取appid,以便可以控制提交完成后的spark作业;
获取代码如下参考:参考红色部分即可!
import org.apache.spark.scheduler.SparkListenerApplicationStart
继承sparkListener
MySparkListener.scala:类代码
class MySparkListener extends SparkListener{
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
println("*************************************************")
println("app:end")
println("*************************************************")
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
println("*************************************************")
println("job:end")
jobEnd.jobResult match {
case JobSucceeded =>
println("job:end:JobSucceeded")
/*case JobFailed(exception) =>
println("job:end:file")
exception.printStackTrace()*/
}
println("*************************************************")
}
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = synchronized {
val appId = applicationStart.appId.get
println("appId==========="+appId)
//记录app的Id,用于后续处理:
//如:yarn application -kill appId
//handleAppId(appId)
}
测试:
package com.spark.task
import org.apache.spark.SparkContext._
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
import org.apache.log4j.Logger
import com.calc.tools.RedisPoolUtil
import java.util.Properties
import com.spark.task.MySparkListener
object StartTask {
def main(args: Array[String]): Unit = {
val param0 = args(0)
var properties: Properties = com.common.ReadProperty.getProperties(args) //com.common.ReadProperty.getProperties(args)
/* import com.dc.appengine.configclient.ConfigClient
if(args.length == 3){
var c:ConfigClient = new ConfigClient(args);
properties = c.getPropertyConfigSpark();
}else{
properties = com.common.ReadProperty.getProperties(args)
}
*/
var logger: Logger = com.common.ReadProperty.getLogger(properties)
//get spark-streamingC
val conf = com.common.SparkCommon.getStreamingConf(properties: Properties, logger: Logger, args: Array[String])
val ssc = com.common.SparkCommon.getStreamingContext(properties, logger, args, conf)
var sparkContextObj = ssc.sparkContext
sparkContextObj.addSparkListener(new MySparkListener)
/*
* 目前没用上 注释掉!
* */
val sqc = new SQLContext(sparkContextObj)
//RunTask.registerUDF(sqc)
//sqc.udf.register("len", (s: String) => s.length())
// sqc.udf.register("substr", (s: String, begin: Int, end: Int) => s.substring(begin, end))
//Run.initdata(sqc, "12010701", "", "")
// Run.run2(sqc, "12010701", "", "")
// kafkaȡ
val kafkaParams = com.common.KafkaCommon.getKafkaParams(properties: Properties): Map[String, String]
val km = com.common.KafkaCommon.getKafkaManager(properties, logger, args, kafkaParams)
val updateZKOffsets = properties.getProperty("kafka.updateZKOffsets")
val calckfkdatatype = properties.getProperty("calc.kafka.dataSourcetype")
val topics = properties.getProperty("kafka.topics")
val topicsSet = topics.split(",").toSet
km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).cache().transform(rdd => {
val statrTime = System.currentTimeMillis()
/*
var redisu = new RedisPoolUtil(properties)
redisu.initConfig(properties)
*/
// var shardedJedisPool:JedisPool = redisu.initRedis()
// var shardedJedis = shardedJedisPool.getResource
var jedisCluster = new RedisPoolUtil(properties).initRedis1(properties)
if (!rdd.isEmpty()) {
//Console.readLine()
if (calckfkdatatype.equals("json")) {
RunTask.run(rdd, sqc, conf, param0, jedisCluster, logger, properties)
}
if ("1".equals(updateZKOffsets)) {
km.updateZKOffsets(rdd)
}
// for(i <- 0 until arrs.length){
//System.out.println(arrs(i).toString())
//System.out.println("111111111=====>"+arrs(i)._2)
//}
}
val statrTime1 = System.currentTimeMillis()
jedisCluster.close() //关闭
println("-----执行时间" + (statrTime1 - statrTime))
val arr = Array(1)
rdd.sparkContext.makeRDD(arr)
}).foreachRDD(rdd => {
})
ssc.start()
ssc.awaitTermination()
}
}