Wordcount是hadoop中的经典例程。运行环境为cloudera-quickstart-vm-5.5.0-0-vmware。
mapper 和 reducer 的数量
hadoop默认每个数据块1个mapper,每个计算核心一个reducer。这样保证了并行计算,也就是活每个计算单元都有活干。
计算时间VS输出文件数量
每一个reducer会产生一个结果输出文件,所以reducer的数量越多,产生的结果文件越多,我们需要在计算时间和结果文件数量之间做权衡。
例子-设置2个reducer
例如我们使用hadoop-streaming运行简单的wordcount。
命令:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py
其中wordcount_mapper.py 和wordcount_reducer.py放在文章最后的附录。
/user/cloudera/input文件夹中存放了两个简单的文本文件:
testfile1 : "A long time ago in a galaxy far faraway"
testfile2: "Another episode of Star Wars"
在没有设置参数numReduceTasks的时候,reducer的数量默认为1。
输出一个结果文件,如下:
A 1
Another 1
Star 1
Wars 1
a 1
ago 1
away 1
episode 1
far 2
galaxy 1
in 1
long 1
of 1
time 1
观察到所有的单词都按照先大写后小写的顺序排序过了。
当我们设置参数numReduceTasks=2时:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new_0 \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py \
-numReduceTasks 2
得到两个输出文件:
[Part-00000]
A 1
Another 1
Wars 1
a 1
ago 1
episode 1
far 2
in 1
of 1
time 1
[part-00001]
Star 1
away 1
galaxy 1
long 1
观察到这两个文件内部也都经过排序,但是某个单词分配到哪个结果文件是随机的。
如果设置reducer的数量为0会怎么样?
命令:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new_0 \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py \
-numReduceTasks 0
设置为0的话就不会执行reducer程序了,直接输出mapper得到的中间结果!
[Part-00000]
A 1
long 1
time 1
ago 1
in 1
a 1
galaxy 1
far 1
far 1
away 1
[part-00001]
Another 1
episode 1
of 1
Star 1
Wars 1
观察到他们就是分别来自两个文件。
附录:
wordcount_mapper.py
#!/usr/bin/env python
#the above just indicates to use python to intepret this file
# ---------------------------------------------------------------
#This mapper code will input a line of text and output <word, 1>
#
# ---------------------------------------------------------------
import sys #a python module with system functions for this OS
# ------------------------------------------------------------
# this 'for loop' will set 'line' to an input line from system
# standard input file
# ------------------------------------------------------------
for line in sys.stdin:
#-----------------------------------
#sys.stdin call 'sys' to read a line from standard input,
# note that 'line' is a string object, ie variable, and it has methods that you can apply to it,
# as in the next line
# ---------------------------------
line = line.strip() #strip is a method, ie function, associated
# with string variable, it will strip
# the carriage return (by default)
keys = line.split() #split line at blanks (by default),
# and return a list of keys
for key in keys: #a for loop through the list of keys
value = 1
print('{0}\t{1}'.format(key, value) ) #the {} is replaced by 0th,1st items in format list
#also, note that the Hadoop default is 'tab' separates key from the value
</pre><pre name="code" class="python">wordcount_reducer.py
#!/usr/bin/env python# ---------------------------------------------------------------#This reducer code will input a line of text and # output <word, total-count># ---------------------------------------------------------------import syslast_key = None #initialize these variablesrunning_total = 0# -----------------------------------# Loop thru file# --------------------------------for input_line in sys.stdin: input_line = input_line.strip() # -------------------------------- # Get Next Word # -------------------------------- this_key, value = input_line.split("\t", 1) #the Hadoop default is tab separates key value #the split command returns a list of strings, in this case into 2 variables value = int(value) #int() will convert a string to integer (this program does no error checking) # --------------------------------- # Key Check part # if this current key is same # as the last one Consolidate # otherwise Emit # --------------------------------- if last_key == this_key: #check if key has changed ('==' is # logical equalilty check running_total += value # add value to running total else: if last_key: #if this key that was just read in # is different, and the previous # (ie last) key is not empy, # then output # the previous <key running-count> print( "{0}\t{1}".format(last_key, running_total) ) # hadoop expects tab(ie '\t') # separation running_total = value #reset values last_key = this_keyif last_key == this_key: print( "{0}\t{1}".format(last_key, running_total))