
时间:2021-10-15 00:41:10

Let me first inform all of you that I am very new to Spark.


I need to process huge number of records in a table and when it is grouped by email it is around 1 million.I need to perform multiple logical calculation based on the data set against individual email and update the database based on the logical calculation


Roughly my code structure is like


Initial Data Load ...


import sparkSession.implicits._ var tableData =, , connectionProperties).select("email").where()

import sparkSession.implicits._ var tableData =,, connectionProperties).select(“email”)。where()

//Data Frame with Records with grouping on email count greater than one


var recordsGroupedBy =tableData.groupBy("email").count().withColumnRenamed("count", "recordcount").filter("recordcount > 1 ").toDF()

var recordsGroupedBy = tableData.groupBy(“email”)。count()。withColumnRenamed(“count”,“recordcount”)。filter(“recordcount> 1”)。toDF()

Now comes the processing after grouping against email using processDataAgainstEmail() method



Here I see foreach is not parallelly executed .I need to invoke the method processDataAgainstEmail(,) in parallel. But if I try to parallelize by doing


Hi I can get a list by invoking


val emailList"email") => r(0).asInstanceOf[String]).collect().toList

val emailList =“email”)。 => r(0).asInstanceOf [String])。collect()。toList

var rdd = sc.parallelize(emailList )

var rdd = sc.parallelize(emailList)

rdd.foreach(x => processDataAgainstEmail(x.getAs("email"),sparkSession))

rdd.foreach(x => processDataAgainstEmail(x.getAs(“email”),sparkSession))

This is not supported as I can not pass sparkSession when using parallelize .


Can anybody help me on this as in processDataAgainstEmail(,) multiple operations would be performed related to database insert and update and also spark dataframe and spark sql operations needs to be performed.

任何人都可以帮助我,因为在processDataAgainstEmail(,)中将执行与数据库插入和更新相关的多个操作,并且还需要执行spark数据帧和spark sql操作。

To summerize I need to invoke parallelly processDataAgainstEmail(,) with sparksession


In case it is not all possible to pass spark session ,the method won't be able to perform anything on database .I am not sure what would be the alternate way as parallelism on email is must for my scenario.


1 个解决方案



The forEach is the method the list that operates on each element of the list sequentially, so you are acting on it one at a time, and passing that to processDataAgainstEmail method.


Once you have gotten the resultant list, you then invoke the sc.parallelize on to parallelize the creation of the dataframe from the list of records you created/manipulated in the previous step. The parallelization, as I can see in the pySpark, is the property of creating of the dataframe, not acting the result of any operation.

获得结果列表后,然后调用sc.parallelize on以从您在上一步中创建/操作的记录列表中并行创建数据帧。正如我在pySpark中看到的那样,并行化是创建数据帧的属性,而不是任何操作的结果。



The forEach is the method the list that operates on each element of the list sequentially, so you are acting on it one at a time, and passing that to processDataAgainstEmail method.


Once you have gotten the resultant list, you then invoke the sc.parallelize on to parallelize the creation of the dataframe from the list of records you created/manipulated in the previous step. The parallelization, as I can see in the pySpark, is the property of creating of the dataframe, not acting the result of any operation.

获得结果列表后,然后调用sc.parallelize on以从您在上一步中创建/操作的记录列表中并行创建数据帧。正如我在pySpark中看到的那样,并行化是创建数据帧的属性,而不是任何操作的结果。