在做的项目里面已经开始用到spark,其中一个项目的场景是这样的:
后台有一个activeMq的listener进程(它是一个守护进程),监听mq队列的消息。 每次收到一个消息,listener进程就会根据消息的内容(其实就是任务的参数,用于生成RDD),提交一个spark任务去运行。这里spark不是streaming,只是一个批处理任务。
我们都知道,activeMq的监听进程是守护进程,不会退出。也就是spark的Driver进程永远不会退出,当使用时间比较长的时候,我们在系统的/tmp目录下,发现大量的spark-<uuid> 的目录,每个目录里面有任务运行的jar包(我们任务依赖于大量的其他项目,用了sparkContext的setJarByClass设置jar包)。每个jar包有100M, 所以如果频繁提交spark任务,每次任务都会产生jar包,那将很快爆盘。(是的,我们就是被爆菊了。。)
其实查下官网的配置文档,我们很容易发现有相关的两个配置可以使用,但是它们其实并不是本次事件的真凶。两个配置分别介绍下:
spark.local.dir<span style="white-space:pre"></span> spark各个worker节点上任务的shuffle map阶段的输出、以及当RDD使用磁盘存储时所用的目录。 在0.9以及之前版本智能在每个application的代码里面设置该参数,1.0的时候引入了<span style="font-family: 'Helvetica Neue', Arial, sans-serif; font-size: 13px; line-height: 18.84375px; background-color: rgb(252, 252, 252);">SPARK_LOCAL_DIRS 这个变量来全局配置。</span>
<span style="font-family: 'Helvetica Neue', Arial, sans-serif; font-size: 13px; line-height: 18.84375px; background-color: rgb(252, 252, 252);"><span style="color: rgb(68, 68, 68); font-family: Menlo, 'Lucida Console', monospace; line-height: 20px;">SPARK_WORKER_DIR 1.0以上版本才有,用于设置每个worker节点的application运行时使用的目录(所以jar包会拷贝一份在该目录,并且在该目录运行application在该节点上面的java程序)</span></span>
我们一开始用的是0.9,后来升到1.0.2版本,尽管配置了上面两个选项,在系统的tmp目录依然发现会有大量临时目录(包含jar包)的文件产生。尽管多次的提交任务和观察发现,只有在Driver所在的机器才会有这个问题,其他的worker机器上面尽管在系统的tmp目录(linux下的/tmp)产生了文件,但是任务运行后,会自动清理掉。所以问题出在Driver端。
后来搜索源码,发现创建临时目录的地方用的是jvm的 java.io.tmpdir属性(默认是系统的tmp目录):
(见org/apache/spark/util/Utils.scala)
/** Create a temporary directory inside the given parent directory */
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
var attempts = 0
val maxAttempts = 10
var dir: File = null
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
maxAttempts + " attempts!")
}
try {
dir = new File(root, "spark-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs()) {
dir = null
}
} catch { case e: IOException => ; }
}
registerShutdownDeleteDir(dir)
// Add a shutdown hook to delete the temp dir when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
override def run() {
// Attempt to delete if some patch which is parent of this is not already registered.
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
dir
}
而删除目录的地方:
<pre name="code" class="java"> Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
override def run() {
// Attempt to delete if some patch which is parent of this is not already registered.
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
这里用的是jvm的shutdown hook,Driver进程一直不退出,自然就不会触发这个hook。。
另外,即使kill掉Driver进程,也不能用kill -9这么暴力的方式,因为jvm的hook是在正常退出时触发的,如果用kill -9则会强制立即杀死进程,它来不及触发相关的清理操作。