我的火花代码中有什么错误来计算每个组织的员工?

时间:2020-12-23 19:24:56

I am doing some basic handson in spark-scala

我正在做一些火花scala的基本动手

The requirement is to display number of employees in each organization.

要求是显示每个组织中的员工数量。

I have achieved the same requirement by using groupByKey and then Mapvalues and also I have achieved the same requirement by creating a keyValueRDD as Array((CTS,1) (CTS,1), (TCS,1)) and then applied reduceByKey((x,y) => x+y) on it . Both produced correct expected result.

我通过使用groupByKey然后使用Mapvalues实现了相同的要求,并且通过创建keyValueRDD作为Array((CTS,1)(CTS,1),(TCS,1))然后应用reduceByKey(() x,y)=> x + y)就可以了。两者产生了正确的预期结果

Now I am trying the below style of logic. I want to use reduceByKey ,but i dont want to have a KeyValueRDD with hardcoded value as 1 to achieve the count of employees.

现在我正在尝试以下的逻辑风格。我想使用reduceByKey,但我不希望KeyValueRDD的硬编码值为1来实现员工的数量。

Please help me on changing this below code to get the expected output . Also i would like to know why i am getting wrong output here in my code

请帮我改变下面的代码以获得预期的输出。此外,我想知道为什么我在我的代码中得到错误的输出

As reduceByKey is commutative i get different output .

由于reduceByKey是可交换的,我得到不同的输出。

scala> myList
res34: List[String] = List(100|Surender|CTS|CHN, 101|Raja|CTS|CHN, 102|Kumar|TCS|BNG)

scala> val listRDD = sc.parallelize(myList)
listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:23

scala> val mapRDD = listRDD.map(elem => elem.split("\\|"))
mapRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25

scala> val keyValueRDD = mapRDD.map(elem => (elem(2),elem(0).toInt))
keyValueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:27

scala> val resultRDD = keyValueRDD.reduceByKey((x,y) => { var incr = 0 ; incr+1 } )
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:29

scala> resultRDD.collect
res36: Array[(String, Int)] = Array((TCS,102), (CTS,1)

Expected Output :

预期产出:

Array((TCS,1), (CTS,2)

1 个解决方案

#1


0  

Even though the question explicitly states It doesn't want to map over hardcoded value of 1 that is absolutely the right way to go as shown below.

即使问题明确指出它不想映射硬编码值1,这绝对是正确的方法,如下所示。

scala> keyValueRDD.map({case (x,y) => x -> 1 }).reduceByKey(_ + _).collect()
res46: Array[(String, Int)] = Array((TCS,1), (CTS,2))

If you understand how spark works you are never supposed to write imperative code such as this { var incr = 0 ; incr+1 } where a function lambda is expected.

如果你理解了spark是如何工作的,你就不应该编写命令式代码,比如这个{var incr = 0; incr + 1}其中函数lambda是预期的。

reduceByKey is supposed to take two arguments of an accumulator and the current value being reduced and it must return the new value of the accumulator. In your code you always return 1 since incr variable is instantiated to 0 for every value being reduced. hence the accumulator value always remains as 1. This explains why the CTS has the value of 1 in your flawed result.

reduceByKey应该采用累加器的两个参数,当前值减小,并且必须返回累加器的新值。在代码中,总是返回1,因为对于每个被减少的值,incr变量被实例化为0。因此累加器值始终保持为1.这解释了为什么CTS在您的缺陷结果中的值为1。

For TCS since spark sees that the key TCS has only a single record, it doesnt need to reduce it any further and hence returns its original value.

对于TCS,因为spark看到密钥TCS只有一条记录,所以它不需要再进一步减少它,因此返回其原始值。

#1


0  

Even though the question explicitly states It doesn't want to map over hardcoded value of 1 that is absolutely the right way to go as shown below.

即使问题明确指出它不想映射硬编码值1,这绝对是正确的方法,如下所示。

scala> keyValueRDD.map({case (x,y) => x -> 1 }).reduceByKey(_ + _).collect()
res46: Array[(String, Int)] = Array((TCS,1), (CTS,2))

If you understand how spark works you are never supposed to write imperative code such as this { var incr = 0 ; incr+1 } where a function lambda is expected.

如果你理解了spark是如何工作的,你就不应该编写命令式代码,比如这个{var incr = 0; incr + 1}其中函数lambda是预期的。

reduceByKey is supposed to take two arguments of an accumulator and the current value being reduced and it must return the new value of the accumulator. In your code you always return 1 since incr variable is instantiated to 0 for every value being reduced. hence the accumulator value always remains as 1. This explains why the CTS has the value of 1 in your flawed result.

reduceByKey应该采用累加器的两个参数,当前值减小,并且必须返回累加器的新值。在代码中,总是返回1,因为对于每个被减少的值,incr变量被实例化为0。因此累加器值始终保持为1.这解释了为什么CTS在您的缺陷结果中的值为1。

For TCS since spark sees that the key TCS has only a single record, it doesnt need to reduce it any further and hence returns its original value.

对于TCS,因为spark看到密钥TCS只有一条记录,所以它不需要再进一步减少它,因此返回其原始值。