无法使用还原键((v1,v2) = >v1 + v2)在spark中计算单词。

时间:2022-06-27 23:09:52

I just started learning spark. Using spark in the standalone mode and trying to do word count in scala. The issue I have observed is reduceByKey() is not grouping the words as expected. NULL array is printed. The steps I have followed are follows...

我刚刚开始学习spark。在独立模式下使用spark,并尝试在scala中进行单词计数。我所观察到的问题是“reduceByKey()”并没有按照预期对单词进行分组。打印空数组。我所遵循的步骤如下……

create a text file and include some words separated by spaces. In the spark shell I am executing the below commands.

创建一个文本文件,并包含一些用空格分隔的单词。在spark shell中,我正在执行下面的命令。

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

scala> import scala.io.Source
import scala.io.Source

val conf = new SparkConf().setAppName("hello")
val sc = new SparkContext(conf)

scala> val textFile = sc.textFile("file:///goutham/tweet.txt")
15/09/20 04:00:32 INFO storage.MemoryStore: ensureFreeSpace(250576) called      with curMem=277327, maxMem=280248975 
15/09/20 04:00:32 INFO storage.MemoryStore: Block broadcast_48 stored as values in memory (estimated size 244.7 KB, free 266.8 MB)
15/09/20 04:00:32 INFO storage.MemoryStore: ensureFreeSpace(25159) called with curMem=527903, maxMem=280248975
15/09/20 04:00:32 INFO storage.MemoryStore: Block broadcast_48_piece0 stored as bytes in memory (estimated size 24.6 KB, free 266.7 MB)
15/09/20 04:00:32 INFO storage.BlockManagerInfo: Added broadcast_48_piece0 in memory on localhost:50471 (size: 24.6 KB, free: 267.2 MB)
15/09/20 04:00:32 INFO spark.SparkContext: Created broadcast 48 from textFile at <console>:29 textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at textFile at <console>:29


scala> val wc = textFile.flatMap(line => line.split(" ")).map( word =>(word,1)).cache()
wc: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at   <console>:31

scala> wc.collect()
res26: Array[(String, Int)] = Array((one,1), (two,1), (three,1), (one,1), (seven,1), (ten,1))

scala> var output = wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_49_piece0 on localhost:50471 in memory (size: 1955.0 B, free: 267.2 MB)
15/09/20 04:06:59 INFO spark.ContextCleaner: Cleaned shuffle 20
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_50_piece0 on localhost:50471 in memory (size: 2.2 KB, free: 267.2 MB)
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_51_piece0 on localhost:50471 in memory (size: 1369.0 B, free: 267.2 MB)

output: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[50] at reduceByKey at :39

输出:org.apache.spark.rdd。RDD[(String, Int)] = ShuffledRDD[50] at:39。

scala> output.collect()
15/09/20 04:09:03 INFO spark.SparkContext: Starting job: collect at <console>:42
15/09/20 04:09:03 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 21 is 143 bytes
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Got job 30 (collect at <console>:42) with 1 output partitions (allowLocal=false)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Final stage: ResultStage 54(collect at <console>:42)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 53)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Missing parents: List()
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Submitting ResultStage 54 (ShuffledRDD[50] at reduceByKey at <console>:39), which has no missing parents
15/09/20 04:09:03 INFO storage.MemoryStore: ensureFreeSpace(2304) called with curMem=563738, maxMem=280248975
15/09/20 04:09:03 INFO storage.MemoryStore: Block broadcast_54 stored as values in memory (estimated size 2.3 KB, free 266.7 MB)
15/09/20 04:09:03 INFO storage.MemoryStore: ensureFreeSpace(1366) called with curMem=566042, maxMem=280248975
15/09/20 04:09:03 INFO storage.MemoryStore: Block broadcast_54_piece0 stored as bytes in memory (estimated size 1366.0 B, free 266.7 MB)
15/09/20 04:09:03 INFO storage.BlockManagerInfo: Added broadcast_54_piece0 in memory on localhost:50471 (size: 1366.0 B, free: 267.2 MB)
15/09/20 04:09:03 INFO spark.SparkContext: Created broadcast 54 from broadcast at DAGScheduler.scala:874
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 54 (ShuffledRDD[50] at reduceByKey at <console>:39)
15/09/20 04:09:03 INFO scheduler.TaskSchedulerImpl: Adding task set 54.0 with 1 tasks
15/09/20 04:09:03 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 54.0 (TID 53, localhost, PROCESS_LOCAL, 1165 bytes)
15/09/20 04:09:03 INFO executor.Executor: Running task 0.0 in stage 54.0 (TID 53)
15/09/20 04:09:03 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/09/20 04:09:03 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/09/20 04:09:03 INFO executor.Executor: Finished task 0.0 in stage 54.0 (TID 53). 882 bytes result sent to driver
15/09/20 04:09:03 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 54.0 (TID 53) in 3 ms on localhost (1/1)
15/09/20 04:09:03 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 54.0, whose tasks have all completed, from pool 
15/09/20 04:09:03 INFO scheduler.DAGScheduler: ResultStage 54 (collect at <console>:42) finished in 0.004 s
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Job 30 finished: collect at <console>:42, took 0.047307 s
res29: Array[(String, Int)] = Array()

==>> Here I am not getting the expected output. Could any one please let me know where I did the mistake ?

PS:: I tried the following steps too. But still unable to get word count.

我也尝试了以下步骤。但仍然无法得到字数。

scala> val wc = textFile.flatMap(line => line.split(" ")).map( word => (word,1)).cache()
scala> val output = wc.reduceByKey((v1,v2) => v1 + v2).collect()
15/09/20 06:59:06 INFO spark.SparkContext: Starting job: collect at <console>:25
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:23)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Got job 3 (collect at <console>:25) with 1 output partitions (allowLocal=false)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 7(collect at <console>:25)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 6)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 6)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 6 (MapPartitionsRDD[3] at map at <console>:23), which has no missing parents
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(4112) called with curMem=286320, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 4.0 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(2315) called with curMem=290432, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 2.3 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:46205 (size: 2.3 KB, free: 267.2 MB)
15/09/20 06:59:06 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:874
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 6 (MapPartitionsRDD[3] at map at <console>:23)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 1385 bytes)
15/09/20 06:59:06 INFO executor.Executor: Running task 0.0 in stage 6.0 (TID 6)
15/09/20 06:59:06 INFO storage.BlockManager: Found block rdd_3_0 locally
15/09/20 06:59:06 INFO executor.Executor: Finished task 0.0 in stage 6.0 (TID 6). 2056 bytes result sent to driver
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 59 ms on localhost (1/1)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
15/09/20 06:59:06 INFO scheduler.DAGScheduler: ShuffleMapStage 6 (map at <console>:23) finished in 0.055 s
15/09/20 06:59:06 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/09/20 06:59:06 INFO scheduler.DAGScheduler: running: Set()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 7)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: failed: Set()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Missing parents for ResultStage 7: List()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting ResultStage 7 (ShuffledRDD[7] at reduceByKey at <console>:25), which is now runnable
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(2288) called with curMem=292747, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 2.2 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(1368) called with curMem=295035, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 1368.0 B, free 267.0 MB)
15/09/20 06:59:06 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:46205 (size: 1368.0 B, free: 267.2 MB)
15/09/20 06:59:06 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:874
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 7 (ShuffledRDD[7] at reduceByKey at <console>:25)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Adding task set 7.0 with 1 tasks
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 7.0 (TID 7, localhost, PROCESS_LOCAL, 1165 bytes)
15/09/20 06:59:06 INFO executor.Executor: Running task 0.0 in stage 7.0 (TID 7)
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Don't have map outputs for shuffle 3, fetching them
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Doing the fetch; tracker endpoint = AkkaRpcEndpointRef(Actor[akka://sparkDriver/user/MapOutputTracker#194665441])
15/09/20 06:59:06 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to localhost:45959
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 3 is 82 bytes
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Got the output locations
15/09/20 06:59:06 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/09/20 06:59:06 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/09/20 06:59:06 INFO executor.Executor: Finished task 0.0 in stage 7.0 (TID 7). 882 bytes result sent to driver
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 7.0 (TID 7) in 19 ms on localhost (1/1)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool 
15/09/20 06:59:06 INFO scheduler.DAGScheduler: ResultStage 7 (collect at <console>:25) finished in 0.015 s
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Job 3 finished: collect at <console>:25, took 0.173682 s
output: Array[(String, Int)] = Array()

scala> output foreach println

scala> 

2 个解决方案

#1


2  

The var output = wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println) itself shows your desired array and its wrong to collect output again, because it is Unit. If you want the result of reduceByKey in form of a local array, you should only collect your RDD. In this case your RDD is wc.reduceByKey((v1,v2) => v1 + v2). So try this var output = wc.reduceByKey((v1,v2) => v1 + v2).collect()

var输出= wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)本身显示了您想要的数组,并再次收集输出,因为它是单元。如果您想要在本地数组的形式下使用reduceByKey的结果,那么您应该只收集您的RDD。在这种情况下,RDD是wc.reduceByKey((v1,v2) => v1 + v2)。因此,试试这个var输出= wc.reduceByKey((v1,v2) => v1 + v2)。

#2


1  

The problem is that your output is assigned to the result of a println, which returns Unit. If you want the result printed, you should either just do that directly, like:

问题是您的输出被分配到println(返回单位)的结果。如果你想打印结果,你可以直接做,比如:

wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)

or assign the collected result to output and the do a println afterwards, like:

或者将收集到的结果分配给输出,然后执行println,比如:

val output = wc.reduceByKey((v1,v2) => v1 + v2).collect()
output foreach println

#1


2  

The var output = wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println) itself shows your desired array and its wrong to collect output again, because it is Unit. If you want the result of reduceByKey in form of a local array, you should only collect your RDD. In this case your RDD is wc.reduceByKey((v1,v2) => v1 + v2). So try this var output = wc.reduceByKey((v1,v2) => v1 + v2).collect()

var输出= wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)本身显示了您想要的数组,并再次收集输出,因为它是单元。如果您想要在本地数组的形式下使用reduceByKey的结果,那么您应该只收集您的RDD。在这种情况下,RDD是wc.reduceByKey((v1,v2) => v1 + v2)。因此,试试这个var输出= wc.reduceByKey((v1,v2) => v1 + v2)。

#2


1  

The problem is that your output is assigned to the result of a println, which returns Unit. If you want the result printed, you should either just do that directly, like:

问题是您的输出被分配到println(返回单位)的结果。如果你想打印结果,你可以直接做,比如:

wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)

or assign the collected result to output and the do a println afterwards, like:

或者将收集到的结果分配给输出,然后执行println,比如:

val output = wc.reduceByKey((v1,v2) => v1 + v2).collect()
output foreach println