spark使用2.1.0版本,采用spark的自己的集群
R使用3.4
使用sparkR编写脚本
问题详情
R脚本中有大量的计算,单次计算在1小时左右,整个R脚本需要做几万次这样的计算,每次通过spark-submit提交执行后运行6000秒后,集群自动删除任务,master。slaver的所有日志中军无任何异常,经过多次尝试都是6000秒结束,
如果我把任务计算量和次数降低,单次执行时间降到几分钟,然后执行几千次,只要总体时间不超过6000秒,整个程序能后非常顺利的跑通。
执行的命令(通过程序执行的不存在ssh超时问题,整体的内存使用不超百兆,也不可能是内存溢出问题)
/spark-submit /home/spark/work/201708011954249240024.R --master spark://master:7077 --driver-memory 1G --driver-cores 1
脚本中获取集群
sparkR.session("spark://master:7077", "债券组合", "/app/spark/spark-2.1.0/",list(spark.executor.memory="1G"))
通过下面代码执行分布式计算 proList是数据集 doOneProgramme针对书籍做重复模拟操作
allproResult <- spark.lapply(proList,doOneProgramme)
补充:
spark.r.backendConnectionTimeout
spark.r.heartBeatInterval
这两个配置都改过没有解决
补充日志:
Error in handleErrors(returnStatus, conn) :
No status is returned. Java SparkR backend might have failed.
Calls: spark.lapply ... .local -> callJMethod -> invokeJava -> handleErrors
停止执行
17/08/02 01:47:21 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 3
17/08/02 01:47:21 INFO SparkContext: Invoking stop() from shutdown hook
17/08/02 01:47:21 INFO SparkUI: Stopped Spark web UI at http://192.168.201.53:4040
17/08/02 01:47:21 INFO DAGScheduler: ResultStage 0 (collect at NativeMethodAccessorImpl.java:0) failed in 6000.841 s due to Stage cancelled because SparkContext was shut down
17/08/02 01:47:21 INFO DAGScheduler: Job 0 failed: collect at NativeMethodAccessorImpl.java:0, took 6001.066492 s
17/08/02 01:47:21 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@b496f41)
17/08/02 01:47:21 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1501609641621,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
17/08/02 01:47:21 INFO StandaloneSchedulerBackend: Shutting down all executors
17/08/02 01:47:21 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
17/08/02 01:47:21 ERROR RBackendHandler: collect on 8 failed
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:806)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1668)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1587)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
... 36 more
17/08/02 01:47:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/08/02 01:47:21 INFO MemoryStore: MemoryStore cleared
17/08/02 01:47:21 INFO BlockManager: BlockManager stopped
17/08/02 01:47:21 INFO BlockManagerMaster: BlockManagerMaster stopped
17/08/02 01:47:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/08/02 01:47:21 DEBUG PoolThreadCache: Freed 5 thread-local buffer(s) from thread: shuffle-server-4-2
17/08/02 01:47:21 INFO SparkContext: Successfully stopped SparkContext
17/08/02 01:47:21 INFO ShutdownHookManager: Shutdown hook called
17/08/02 01:47:21 INFO ShutdownHookManager: Deleting directory /tmp/spark-bb5c8576-7f04-4b49-8d9d-3199d0786d98
3 个解决方案
#1
#2
@小小孟子 又遇到过么
#3
我来说一下解决办法吧,看了一下sparkR的源码,在sparkR.R文件中有
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "60000"))
这样配置,不管我怎么搞配置SPARKR_BACKEND_CONNECTION_TIMEOUT这个环境变量都被覆盖,所以暴力的改成
connectionTimeout <- 99999999
哈哈,本以为万事大吉了,谁想到下边又被复写了。哭晕呀!
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)
没看懂这个数据从哪里来的
所以一样暴力 的
connectionTimeout <- 9999999
进入 R目录执行
sed -i 's/\r$//' install-dev.sh
SPARK_VERSION=2.1.0 SPARK_HADOOP_VERSION=2.7.0 ./install-dev.sh
编译sparkR的lib 覆盖就可以了!
亲测帮帮的
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "60000"))
这样配置,不管我怎么搞配置SPARKR_BACKEND_CONNECTION_TIMEOUT这个环境变量都被覆盖,所以暴力的改成
connectionTimeout <- 99999999
哈哈,本以为万事大吉了,谁想到下边又被复写了。哭晕呀!
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)
没看懂这个数据从哪里来的
所以一样暴力 的
connectionTimeout <- 9999999
进入 R目录执行
sed -i 's/\r$//' install-dev.sh
SPARK_VERSION=2.1.0 SPARK_HADOOP_VERSION=2.7.0 ./install-dev.sh
编译sparkR的lib 覆盖就可以了!
亲测帮帮的
#1
#2
@小小孟子 又遇到过么
#3
我来说一下解决办法吧,看了一下sparkR的源码,在sparkR.R文件中有
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "60000"))
这样配置,不管我怎么搞配置SPARKR_BACKEND_CONNECTION_TIMEOUT这个环境变量都被覆盖,所以暴力的改成
connectionTimeout <- 99999999
哈哈,本以为万事大吉了,谁想到下边又被复写了。哭晕呀!
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)
没看懂这个数据从哪里来的
所以一样暴力 的
connectionTimeout <- 9999999
进入 R目录执行
sed -i 's/\r$//' install-dev.sh
SPARK_VERSION=2.1.0 SPARK_HADOOP_VERSION=2.7.0 ./install-dev.sh
编译sparkR的lib 覆盖就可以了!
亲测帮帮的
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "60000"))
这样配置,不管我怎么搞配置SPARKR_BACKEND_CONNECTION_TIMEOUT这个环境变量都被覆盖,所以暴力的改成
connectionTimeout <- 99999999
哈哈,本以为万事大吉了,谁想到下边又被复写了。哭晕呀!
f <- file(path, open = "rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)
没看懂这个数据从哪里来的
所以一样暴力 的
connectionTimeout <- 9999999
进入 R目录执行
sed -i 's/\r$//' install-dev.sh
SPARK_VERSION=2.1.0 SPARK_HADOOP_VERSION=2.7.0 ./install-dev.sh
编译sparkR的lib 覆盖就可以了!
亲测帮帮的