I have a big table A, and two sets table B1, B2, ..., Bn and C1, C2, ..., Cn. My logic is: Bi join Ci join A. A is big and used many times, so I want to load A first and cache it. Then in each loop i, load Bi and Ci, and join three tables. But here's the problem: every time loading A, the following exception happened
我有一个大表A,两个表B1,B2,...,Bn和C1,C2,...,Cn。我的逻辑是:Bi连接Ci连接A. A很大并且多次使用,所以我想首先加载A并缓存它。然后在每个循环i中,加载Bi和Ci,并连接三个表。但问题是:每次加载A时,都会发生以下异常
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 615 tasks (3.0 GB) is bigger than spark.driver.maxResultSize (3.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
I don't know why, does cache operation send serialized data like some meta data to driver?
我不知道为什么,缓存操作会将一些元数据等序列化数据发送给驱动程序吗?
1 个解决方案
#1
0
I'm hitting something similar, came across this:
我碰到了类似的东西,碰到了这个:
I haven't been able to try the same code on 2.2.0, where the issue is supposedly fixed. If you do get a chance to try it there, let us know.
我无法在2.2.0上尝试相同的代码,因为这个问题已经修复了。如果您确实有机会在那里尝试,请告诉我们。
#1
0
I'm hitting something similar, came across this:
我碰到了类似的东西,碰到了这个:
I haven't been able to try the same code on 2.2.0, where the issue is supposedly fixed. If you do get a chance to try it there, let us know.
我无法在2.2.0上尝试相同的代码,因为这个问题已经修复了。如果您确实有机会在那里尝试,请告诉我们。