无法从SparkR创建的DataFrame中检索数据

时间:2022-10-19 23:08:38

I have below simple SparkR program, which is to create a SparkR DataFrame and retrieve/collect data from it.

我下面有一个简单的SparkR程序,它将创建一个SparkR DataFrame并从中检索/收集数据。

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

I am able to create it and view information successfully, but any operation related to fetch data is throwing below error.

我能够创建它并成功地查看信息,但是任何与获取数据相关的操作都在错误下面抛出。

16/07/25 16:33:59 WARN TaskSetManager: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

警告TaskSetManager:在17.0阶段丢失了任务0.3 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException:在java.net. abstractplain . org. php . socketaccept (Native Method)在java.net. abstracrrtplain . timpl . Accept(abstractplain .明文. timpl. java:398)在java在组织网站上,你可以看到,在组织网站上,你可以看到在org.apache. spark.rdd.r . computeorreadcheck.303 . 303上运行java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java:615)java.lang.Thread.run(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 16/07/25 16:33:59 ERROR RBackendHandler: dfToCols on org.apache.spark.sql.api.r.SQLUtils failed Error in invokeJava(isStatic = TRUE, className, methodName, ...) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPar

TaskSetManager:第17.0阶段的任务0失败4次;终止作业16/07/25 16:33:59错误RBackendHandler: dfToCols on org.apache.spark.sql.api.r。SQLUtils在invokeJava中出错(isStatic = TRUE, className, methodName,…):org.apache.spark。SparkException:由于stage failure: Task 0在stage 17.0失败4次,最近一次失败:在stage 17.0丢失Task 0.3 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException:在java.net. abstractplain . org. php . socketaccept (Native Method)在java.net. abstracrrtplain . timpl . Accept(abstractplain .明文. timpl. java:398)在java在org.apache. sparkk.rdd.r ddr .iterator(rddk . rddk . rddk . mappartitionsrdd.compute (mappartitionsrdd.38)

If I am executing it by sparkR command line like below, it's getting executed.

如果我像下面这样通过sparkR命令行执行它,它将被执行。

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client

But when I am executing it via R, and sparkR.init((master="yarn-client"), it's throwing error.

但是当我通过R和sparkR.init(master="yarn-client")执行时,它会抛出错误。

Can someone please help resolving these errors?

谁能帮忙解决这些错误吗?

1 个解决方案

#1


5  

Adding this line made the difference:

加上这条线就不同了:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

Here is the full code:

这是完整的代码:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

#1


5  

Adding this line made the difference:

加上这条线就不同了:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

Here is the full code:

这是完整的代码:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)