通过DeveloperApi获取spark程序执行进度及异常

时间:2023-03-08 17:07:42

在应用spark时,经常要获取任务的执行进度,可以参照jobProgressListener的设计来完成该功能。

以下代码仅供参考,欢迎交流。

效果显示:

通过DeveloperApi获取spark程序执行进度及异常

代码:

package org.apache.spark.zpc.listener

import org.apache.spark.Logging
import org.apache.spark.scheduler._ import scala.collection.mutable /**
* Spark 的 DeveloperApi 提供针对app, job, task的执行监听。
* 通过该监听,可以实现:
* 1.任务执行进度的粗略计算。
* 2.执行异常失败时,获取异常信息。
* 3.获取app启动的appId,从而可以控制杀死任务。
* 4.自定义进度和异常的handle处理(如控制台打印,保存db,或jms传输到web等终端
*
* @param jobNum Application中Job个数。可以通过代码的提交查看spark日志查看到。
*/ abstract class SparkAppListener(jobNum: Int) extends SparkListener with Logging { //Job和Job信息(包括总task数,当前完成task数,当前Job百分比)的映射
private val jobToJobInfo = new mutable.HashMap[Int, (Int, Int, Int)]
//stageId和Job的映射,用户获取task对应的job
private val stageToJob = new mutable.HashMap[Int, Int]
//完成的job数量
private var finishJobNum = 0
private var hasException: Boolean = false override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = synchronized {
val appId = applicationStart.appId
//记录app的Id,用于后续处理:
//如:yarn application -kill appId
//handleAppId(appId)
} //获取job的task数量,初始化job信息
override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
val jobId = jobStart.jobId
val tasks = jobStart.stageInfos.map(stageInfo => stageInfo.numTasks).sum
jobToJobInfo += (jobId ->(tasks, 0, 0))
jobStart.stageIds.map(stageId => stageToJob(stageId) = jobId)
} //task结束时,粗略估计当前app执行进度。
//估算方法:当前完成task数量/总task数量。总完成task数量按(job总数*当前job的task数。)
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val stageId = taskEnd.stageId
val jobId = stageToJob.get(stageId).get
val (totalTaskNum: Int, finishTaskNum: Int, percent: Int) = jobToJobInfo.get(jobId).get
val currentFinishTaskNum = finishTaskNum + 1
val newPercent = currentFinishTaskNum * 100 / (totalTaskNum * jobNum)
jobToJobInfo(jobId) = (totalTaskNum, currentFinishTaskNum, newPercent) if (newPercent > percent) {
//hanlde application progress
val totalPercent = jobToJobInfo.values.map(_._3).sum
if (totalPercent <= 100){
// handleAppProgress(totalPercent)
}
}
} //job 结束,获取job结束的状态,异常结束可以将异常的类型返回处理。
// handle处理自定义,比如返回给web端,显示异常log。
override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
jobEnd.jobResult match {
case JobSucceeded => finishJobNum += 1
case JobFailed(exception) if !hasException =>
hasException = true // handle application failure
// handleAppFailure(exception)
case _ =>
}
} //app结束时,将程序执行进度标记为 100%。
//缺陷:SparkListenerApplicationEnd没有提供app的Exception的获取。这样,当程序在driver端出错时,
//获取不到出错的具体原因返回给前端,自定义提示。比如(driver对app中的sql解析异常,还没有开始job的运行) /*** driver 端异常可通过主程序代码里 try catch获取到 ***/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = synchronized {
val totalJobNum = jobToJobInfo.keySet.size
val totalPercent = jobToJobInfo.values.map(_._3).sum
//handle precision lose
if (!hasException && totalPercent == 99) {
// handleAppProgress(100)
}
val msg = "执行失败"
if(totalJobNum == 0){
handleAppFailure(new Exception(msg))
}
}
}

博客记录是个好习惯,计划一下以后几期的博客。

由浅入深,围绕机器学习的主题,来学习介绍。

模型评估与选择 
 线性模型
 决策树
 神经网络
 支持向量机
 贝叶斯分类器
  贝叶斯决策论 
  极大似然估计 
  朴素贝叶斯分类器 
  半朴素贝叶斯分类器 
  贝叶斯网 
 集成学习
  个体与集成
  Boosting
  Bagging与随机森林
  Bagging
  随机森林 
 聚类 
 降维与度量学习 
 特征选择与稀疏学习
 概率图模型
   隐马尔可夫模型
   马尔可夫随机场
 规则学习
 强化学习
 深度学习系列