Cloudera Hadoop MapReduce –wordcount例程中设定reducer的数量为2

时间:2021-04-12 06:24:26

Wordcount是hadoop中的经典例程。运行环境为cloudera-quickstart-vm-5.5.0-0-vmware。

mapper 和 reducer 的数量

hadoop默认每个数据块1个mapper,每个计算核心一个reducer。这样保证了并行计算,也就是活每个计算单元都有活干。

计算时间VS输出文件数量

每一个reducer会产生一个结果输出文件,所以reducer的数量越多,产生的结果文件越多,我们需要在计算时间和结果文件数量之间做权衡。

例子-设置2个reducer

例如我们使用hadoop-streaming运行简单的wordcount。

命令:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py

其中wordcount_mapper.py  和wordcount_reducer.py放在文章最后的附录。

/user/cloudera/input文件夹中存放了两个简单的文本文件:

testfile1 : "A long time ago in a galaxy far faraway"

testfile2: "Another episode of Star Wars"

在没有设置参数numReduceTasks的时候,reducer的数量默认为1。

输出一个结果文件,如下:

A             1

Another               1

Star        1

Wars      1

a             1

ago        1

away     1

episode 1

far          2

galaxy   1

in            1

long       1

of           1

time       1

观察到所有的单词都按照先大写后小写的顺序排序过了。

 

当我们设置参数numReduceTasks=2时:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new_0 \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py \
-numReduceTasks 2

得到两个输出文件:

[Part-00000]

A             1

Another               1

Wars      1

a             1

ago        1

episode 1

far          2

in            1

of           1

time       1


[part-00001]

Star        1

away     1

galaxy   1

long       1

观察到这两个文件内部也都经过排序,但是某个单词分配到哪个结果文件是随机的。

 

如果设置reducer的数量为0会怎么样?

命令:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input/user/cloudera/input \
-output/user/cloudera/output_new_0 \
-mapper/home/cloudera/wordcount_mapper.py \
-reducer/home/cloudera/wordcount_reducer.py \
-numReduceTasks 0

设置为0的话就不会执行reducer程序了,直接输出mapper得到的中间结果!

[Part-00000]

A             1

long       1

time       1

ago        1

in            1

a             1

galaxy   1

far          1

far          1

away     1

 

[part-00001]

Another               1

episode 1

of           1

Star        1

Wars      1

观察到他们就是分别来自两个文件。


附录:

wordcount_mapper.py

#!/usr/bin/env python   
#the above just indicates to use python to intepret this file

# ---------------------------------------------------------------
#This mapper code will input a line of text and output <word, 1>
#
# ---------------------------------------------------------------

import sys #a python module with system functions for this OS

# ------------------------------------------------------------
# this 'for loop' will set 'line' to an input line from system
# standard input file
# ------------------------------------------------------------
for line in sys.stdin:

#-----------------------------------
#sys.stdin call 'sys' to read a line from standard input,
# note that 'line' is a string object, ie variable, and it has methods that you can apply to it,
# as in the next line
# ---------------------------------
line = line.strip() #strip is a method, ie function, associated
# with string variable, it will strip
# the carriage return (by default)
keys = line.split() #split line at blanks (by default),
# and return a list of keys
for key in keys: #a for loop through the list of keys
value = 1
print('{0}\t{1}'.format(key, value) ) #the {} is replaced by 0th,1st items in format list
#also, note that the Hadoop default is 'tab' separates key from the value
</pre><pre name="code" class="python">wordcount_reducer.py
#!/usr/bin/env python# ---------------------------------------------------------------#This reducer code will input a line of text and #    output <word, total-count># ---------------------------------------------------------------import syslast_key      = None              #initialize these variablesrunning_total = 0# -----------------------------------# Loop thru file#  --------------------------------for input_line in sys.stdin:    input_line = input_line.strip()    # --------------------------------    # Get Next Word    # --------------------------------    this_key, value = input_line.split("\t", 1)  #the Hadoop default is tab separates key value                          #the split command returns a list of strings, in this case into 2 variables    value = int(value)           #int() will convert a string to integer (this program does no error checking)     # ---------------------------------    # Key Check part    #    if this current key is same     #          as the last one Consolidate    #    otherwise  Emit    # ---------------------------------    if last_key == this_key:     #check if key has changed ('==' is                                   #      logical equalilty check        running_total += value   # add value to running total    else:        if last_key:             #if this key that was just read in                                 #   is different, and the previous                                  #   (ie last) key is not empy,                                 #   then output                                  #   the previous <key running-count>            print( "{0}\t{1}".format(last_key, running_total) )                                 # hadoop expects tab(ie '\t')                                  #    separation        running_total = value    #reset values        last_key = this_keyif last_key == this_key:    print( "{0}\t{1}".format(last_key, running_total))