I just started learning Spark Programming and Python programming: Can you please help me understand my mistake in my code:
我刚开始学习Spark编程和Python编程:能否帮我理解我的代码中的错误:
I am running code in Jupyter Notebooks, interactive mode.
我在Jupyter笔记本中运行代码,交互模式。
-
The below test code is working fine, where I tested the concept:
以下测试代码工作正常,我测试了这个概念:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]) result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11]))) print (result.top(3))
Output:
[('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]
-
Below also works fine:
下面也工作正常:
#[(movieid, genre_list)] aggregateRDD = movieRDD.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]))) print (aggregateRDD.top(3))
Output:
[(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
-
But, when I use a similar concept in my program, it doesn't accept. What am I doing wrong:
但是,当我在我的程序中使用类似的概念时,它不接受。我究竟做错了什么:
############################################################################## ### Analysis of Movie Ratings percentages across Occupation and Movie Genre ############################################################################## from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("popularMovie") sc = SparkContext(conf =conf) ###import movie ratings into RDD ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data") ###import user details into RDD userLines = sc.textFile("///SparkCourse/ml-100k/u.user") ###import movie data into RDD movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item") ###import genre data into RDD genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre") ###split on delimiter functions def splitRatingTab(line): line = line.split('\t') return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating) def splitUserPipe(line): line = line.split('|') return (int(line[0]), line[3]) #(user, occupation) def splitMoviePipe(line): line = line.split('|') return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[]) def listToIntElements(lst): """conver the boolean text ('0', '1') genre value to integers (0, 1)""" for cnt, _ in enumerate(lst): lst[cnt] = int(_) return lst ###create dictionary object for genreid and genre def loadMovieGenre(): """ create dictionary object for genreid and genre """ genre = {} with open('C:/SparkCourse/ml-100k/u.genre') as file: for line in file: #each line is of type [genere, genreid] line = line.split('|') #convert genreid to int, to remove new line '\n' at the end of string genre[int(line[1])] = line[0] return genre ### Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers ratingRDD = ratingLines.map(lambda line: splitRatingTab(line)) ### Transform to RDD as [(user, occupation)] occupationRDD = userLines.map(splitUserPipe) ### Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres movieRDD = movieLines.map(splitMoviePipe) ###join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; ###then Transform to [(movieid,((userid, rating), genre) )] joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD) ###Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)] ###to Transform to [(occupation, ((1, genre)))] transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1]))) joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1]))) print (joinRatingGenresOccup.take(2)) ###Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))] totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3]))) print (totalRatingGenreCntByOccupation.take(2))
Error:
[('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))] --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-137-a156d8bbfde9> in <module>() ----> 1 get_ipython().run_cell_magic('time', '', '\n##############################################################################\n### Analysis of Movie Ratings percentages across Occupation and Movie Genre\n##############################################################################\n\n#import movie ratings into RDD\nratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")\n#import user details into RDD\nuserLines = sc.textFile("///SparkCourse/ml-100k/u.user")\n#import movie data into RDD\nmovieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")\n#import genre data into RDD\ngenreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")\n\n#split on delimiter functions\ndef splitRatingTab(line):\n line = line.split(\'\\t\')\n return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)\ndef splitUserPipe(line):\n line = line.split(\'|\')\n return (int(line[0]), line[3]) #(user, occupation)\ndef splitMoviePipe(line):\n line = line.split(\'|\')\n return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])\n\n\ndef listToIntElements(lst):\n """conver the boolean text (\'0\', \'1\') genre value to integers (0, 1)"""\n for cnt, _ in enumerate(lst):\n lst[cnt] = int(_)\n return lst\n\n#create dictionary object for genreid and genre\ndef loadMovieGenre():\n """\n create dictionary object for genreid and genre\n """\n genre = {}\n with open(\'C:/SparkCourse/ml-100k/u.genre\') as file:\n for line in file:\n #each line is of type [genere, genreid]\n line = line.split(\'|\')\n #convert genreid to int, to remove new line \'\\n\' at the end of string\n genre[int(line[1])] = line[0]\n return genre\n\n \n# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers\nratingRDD = ratingLines.map(lambda line: splitRatingTab(line))\n#print (\'ratingRDD:\\n\',ratingRDD.top(5))\n\n# Transform to RDD as [(user, occupation)]\noccupationRDD = userLines.map(splitUserPipe)\n#print (\'occupationRDD:\\n\',occupationRDD.top(3))\n\n# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres\nmovieRDD = movieLines.map(splitMoviePipe)\n#print (\'movieRDD:\\n\',movieRDD.top(3))\n\n#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; \n#then Transform to [(movieid,((userid, rating), genre) )]\njoinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)\n#print (joinRatingMovieGenres.take(2))\n\n#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]\n#to Transform to [(occupation, ((1, genre)))]\ntransRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))\njoinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))\nprint (joinRatingGenresOccup.take(2))\n\n\n#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]\ntotalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))\nprint (totalRatingGenreCntByOccupation.take(2))') C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell) 2113 magic_arg_s = self.var_expand(line, stack_depth) 2114 with self.builtin_trap: -> 2115 result = fn(magic_arg_s, cell) 2116 return result 2117 <decorator-gen-60> in time(self, line, cell, local_ns) C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k) 186 # but it's overkill for just that one bit of state. 187 def magic_deco(arg): --> 188 call = lambda f, *a, **k: f(*a, **k) 189 190 if callable(arg): C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns) 1183 else: 1184 st = clock2() -> 1185 exec(code, glob, local_ns) 1186 end = clock2() 1187 out = None <timed exec> in <module>() C:\spark\python\pyspark\rdd.py in take(self, num) 1356 1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1358 res = self.context.runJob(self, takeUpToNumLeft, p) 1359 1360 items += res C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 999 # SparkContext#runJob. 1000 mappedRDD = rdd.mapPartitions(partitionFunc) -> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 1003 C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args: C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 239.0 failed 1 times, most recent failure: Lost task 1.0 in stage 239.0 (TID 447, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark\python\pyspark\rdd.py", line 362, in func return f(iterator) File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally merger.mergeValues(iterator) File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "<timed exec>", line 73, in <lambda> TypeError: 'int' object is not subscriptable at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func return func(split, prev_func(split, iterator)) File "C:\spark\python\pyspark\rdd.py", line 362, in func return f(iterator) File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally merger.mergeValues(iterator) File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "<timed exec>", line 73, in <lambda> TypeError: 'int' object is not subscriptable at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
1 个解决方案
#1
0
When you reduceByKey you have to return the same structure you have received, otherwise the next time you will meet a value of the same key and will try to reduce it your function will not work.
当您reduceByKey时,您必须返回您收到的相同结构,否则下次您将遇到相同键的值并尝试减少它时,您的功能将无法正常工作。
You have tested only on two elements so you haven't seen it, but if you'll try with 3...:
你只测试了两个元素,所以你还没有看到它,但如果你试试3 ...:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]),\
(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))
..... File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1784, in _mergeCombiners merger.mergeCombiners(iterator) File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py", line 272, in mergeCombiners d[k] = comb(d[k], v) if k in d else v File "", line 3, in TypeError: 'int' object has no attribute 'getitem' ......
.....文件“/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py”,第1784行,在_mergeCombiners中合并.mergeCombiners(iterator)文件“/ home / hado /spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py“,第272行,在mergeCombiners中d [k] = comb(d [k],v)如果k in否则v在TypeError中的文件“”,第3行:'int'对象没有属性'getitem'......
The right way to do the reduceByKey in your code is to return the same tuple with a value and a list of the same size:
在代码中执行reduceByKey的正确方法是返回具有值和相同大小的列表的相同元组:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ( x[0] + y[0],\
[x[1][0]+y[1][0], x[1][1]+y[1][1], x[1][2]+y[1][2], x[1][3]+y[1][3] ] ))
print (result.collect())
[('librarian', (3, [0, 2, 1, 0]))]
[('图书管理员',(3,[0,2,1,0]))]
You can also do a combineByKey like explained here: `combineByKey`, pyspark
你也可以像这里解释的那样做一个combineByKey:`combineByKey`,pyspark
Also note that this (In the second to last line): "(x[1][2]+y[1][12])" seems like a typo.
还要注意这个(在倒数第二行):“(x [1] [2] + y [1] [12])”似乎是一个错字。
#1
0
When you reduceByKey you have to return the same structure you have received, otherwise the next time you will meet a value of the same key and will try to reduce it your function will not work.
当您reduceByKey时,您必须返回您收到的相同结构,否则下次您将遇到相同键的值并尝试减少它时,您的功能将无法正常工作。
You have tested only on two elements so you haven't seen it, but if you'll try with 3...:
你只测试了两个元素,所以你还没有看到它,但如果你试试3 ...:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]),\
(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))
..... File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1784, in _mergeCombiners merger.mergeCombiners(iterator) File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py", line 272, in mergeCombiners d[k] = comb(d[k], v) if k in d else v File "", line 3, in TypeError: 'int' object has no attribute 'getitem' ......
.....文件“/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py”,第1784行,在_mergeCombiners中合并.mergeCombiners(iterator)文件“/ home / hado /spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py“,第272行,在mergeCombiners中d [k] = comb(d [k],v)如果k in否则v在TypeError中的文件“”,第3行:'int'对象没有属性'getitem'......
The right way to do the reduceByKey in your code is to return the same tuple with a value and a list of the same size:
在代码中执行reduceByKey的正确方法是返回具有值和相同大小的列表的相同元组:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ( x[0] + y[0],\
[x[1][0]+y[1][0], x[1][1]+y[1][1], x[1][2]+y[1][2], x[1][3]+y[1][3] ] ))
print (result.collect())
[('librarian', (3, [0, 2, 1, 0]))]
[('图书管理员',(3,[0,2,1,0]))]
You can also do a combineByKey like explained here: `combineByKey`, pyspark
你也可以像这里解释的那样做一个combineByKey:`combineByKey`,pyspark
Also note that this (In the second to last line): "(x[1][2]+y[1][12])" seems like a typo.
还要注意这个(在倒数第二行):“(x [1] [2] + y [1] [12])”似乎是一个错字。