I have below simple SparkR
program, which is to create a SparkR DataFrame
and retrieve/collect data from it.
我下面有一个简单的SparkR程序,它将创建一个SparkR DataFrame并从中检索/收集数据。
Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)
n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)
xs
head(xs)
collect(xs)
I am able to create it and view information successfully, but any operation related to fetch data is throwing below error.
我能够创建它并成功地查看信息,但是任何与获取数据相关的操作都在错误下面抛出。
16/07/25 16:33:59 WARN TaskSetManager: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
警告TaskSetManager:在17.0阶段丢失了任务0.3 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException:在java.net. abstractplain . org. php . socketaccept (Native Method)在java.net. abstracrrtplain . timpl . Accept(abstractplain .明文. timpl. java:398)在java在组织网站上,你可以看到,在组织网站上,你可以看到在org.apache. spark.rdd.r . computeorreadcheck.303 . 303上运行java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java:615)java.lang.Thread.run(Thread.java:745)
16/07/25 16:33:59 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 16/07/25 16:33:59 ERROR RBackendHandler: dfToCols on org.apache.spark.sql.api.r.SQLUtils failed Error in invokeJava(isStatic = TRUE, className, methodName, ...) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPar
TaskSetManager:第17.0阶段的任务0失败4次;终止作业16/07/25 16:33:59错误RBackendHandler: dfToCols on org.apache.spark.sql.api.r。SQLUtils在invokeJava中出错(isStatic = TRUE, className, methodName,…):org.apache.spark。SparkException:由于stage failure: Task 0在stage 17.0失败4次,最近一次失败:在stage 17.0丢失Task 0.3 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException:在java.net. abstractplain . org. php . socketaccept (Native Method)在java.net. abstracrrtplain . timpl . Accept(abstractplain .明文. timpl. java:398)在java在org.apache. sparkk.rdd.r ddr .iterator(rddk . rddk . rddk . mappartitionsrdd.compute (mappartitionsrdd.38)
If I am executing it by sparkR command line like below, it's getting executed.
如果我像下面这样通过sparkR命令行执行它,它将被执行。
~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client
But when I am executing it via R, and sparkR.init((master="yarn-client"), it's throwing error.
但是当我通过R和sparkR.init(master="yarn-client")执行时,它会抛出错误。
Can someone please help resolving these errors?
谁能帮忙解决这些错误吗?
1 个解决方案
#1
5
Adding this line made the difference:
加上这条线就不同了:
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
Here is the full code:
这是完整的代码:
Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)
n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)
xs
head(xs)
collect(xs)
#1
5
Adding this line made the difference:
加上这条线就不同了:
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
Here is the full code:
这是完整的代码:
Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)
n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)
xs
head(xs)
collect(xs)