在spark中,对数据帧进行缓存操作会导致序列化结果过大异常吗?

时间:2022-02-01 23:12:57

I have a big table A, and two sets table B1, B2, ..., Bn and C1, C2, ..., Cn. My logic is: Bi join Ci join A. A is big and used many times, so I want to load A first and cache it. Then in each loop i, load Bi and Ci, and join three tables. But here's the problem: every time loading A, the following exception happened

我有一个大表A,两个表B1,B2,...,Bn和C1,C2,...,Cn。我的逻辑是:Bi连接Ci连接A. A很大并且多次使用,所以我想首先加载A并缓存它。然后在每个循环i中,加载Bi和Ci,并连接三个表。但问题是:每次加载A时,都会发生以下异常

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 615 tasks (3.0 GB) is bigger than spark.driver.maxResultSize (3.0 GB)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)

I don't know why, does cache operation send serialized data like some meta data to driver?

我不知道为什么,缓存操作会将一些元数据等序列化数据发送给驱动程序吗?

1 个解决方案

#1


0  

I'm hitting something similar, came across this:

我碰到了类似的东西,碰到了这个:

SPARK-128347

I haven't been able to try the same code on 2.2.0, where the issue is supposedly fixed. If you do get a chance to try it there, let us know.

我无法在2.2.0上尝试相同的代码,因为这个问题已经修复了。如果您确实有机会在那里尝试,请告诉我们。

#1


0  

I'm hitting something similar, came across this:

我碰到了类似的东西,碰到了这个:

SPARK-128347

I haven't been able to try the same code on 2.2.0, where the issue is supposedly fixed. If you do get a chance to try it there, let us know.

我无法在2.2.0上尝试相同的代码,因为这个问题已经修复了。如果您确实有机会在那里尝试,请告诉我们。