终于完成了自己的第一个MAP-REDUCE程序,程序的主要功能是对输入文件中的一组向量,计算新的向量和文件中的向量距离,并按距离从小到大排序。下一步计算应用到高维数据中寻找相似向量的程序中。
从Map-reduce程序开发的角度考虑自己做的这个程序,以后需要注意的这几点:
1、map根据定义的输入格式自动读入数据,默认的是对文本文件中每行的值读取作为value, 看很多书说其key是行,我觉得准确的说应该key是每行的首地址,相对于最开始的偏移量。这从下面的console的输出可以看出。另外,我也试验过如果输入是一个文件夹,map会自动的按文件逐个读取文件的内容,并且对每个文件的key也是每行的首地址值。
2、根据1中提到的,这样Map的个数应该由文本文件中的函数决定,而reduce的个数个人理解是根据你在map中context.write写入的key的值相关,它会自动把相同的key归并到一起,也就是说,如果map读入key的值,你在写context.write中,把所有的key按照key%10 == 0来划分设置key=A,or key = B, 这样reduce就只有两个,并把key==A对应的所有的value的值归并到一起传递给reduce中的Iterable<Text>中。总之,map的个数和读入的数据格式有关,reduce的个数和map函数中写入的key的值的组数有关,相同的key会合并为一个。
3、注意map和reduce的输入输出,map的输出和reduce的输入类型要一一致。另外函数里面的类型也要一致或者做类型转换。
4、map的输出会自动按照key的值进行归并,同样,reduce的输出也会按照大小进行排序输出,比如这个例子的reduce-key是distance,他会按照distance的值的大小进行排序输出,具体见输出文件。
输入文件:model_data
10 11 12 13 14
20 21 22 23 24
10 13 13 12 11
30 31 32 33 34
10 16 17 12 14
40 41 42 43 44
50 51 52 53 54
32 36 38 34 35
22 24 26 23 27
52 54 53 53 58
主程序ModelMatch.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ModelMatch {
public static void main(String[] args) throws Exception
{
Path inputPath = new Path("hdfs://192.168.56.171:9000/ModelMatch/input/model_data");
Path outputPath = new Path("hdfs://192.168.56.171:9000/ModelMatch/output");
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.56.171:9000");
//if exists the output path, delete it
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
Job job = new Job(conf,"ModelMatch");
job.setJarByClass(ModelMatch.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MMapper.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MReducer.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
Map程序:MMapper.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MMapper extends Mapper<LongWritable, Text, DoubleWritable, Text>{
final int DATASIZE = 5;
double[] newData = {11, 11, 11, 11, 11};
protected void setup(Context context) throws IOException,InterruptedException
{
//System.out.println("map setup");
}
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
//System.out.println("-- map start --");
System.out.println("-- key --" + key.get() + "-- value --" + value);
double[] tmpModData = new double[DATASIZE];
int count = 0;
String lineString = value.toString();
StringTokenizer tokenizer = new StringTokenizer(lineString);
while (tokenizer.hasMoreTokens()) {
String valString = tokenizer.nextToken();
tmpModData[count] = Double.parseDouble(valString);
count++;
}
if (count != DATASIZE) {
System.out.println("Map Token Error!");
}
double distance = GetDistance(tmpModData, newData);
if (distance == -1) {
System.out.println("Map GetDistance Error!");
}
context.write(new DoubleWritable(distance), new Text(value));
//System.out.println("-- map end --");
}
public double GetDistance(double[] arrayA, double[] arrayB){
if (arrayA.length != arrayB.length) {
return -1;
}
double dis = 0;
for (int i = 0; i < arrayA.length; i++) {
dis += Math.pow(arrayA[i] - arrayB[i], 2);
}
return dis;
}
}
Reduce程序:MReducer.java
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MReducer extends Reducer<DoubleWritable, Text, DoubleWritable, Text> {
public void reduce(DoubleWritable key,Iterable<Text> value,Context context) throws IOException,InterruptedException
{
//System.out.println("-- reduce start--");
System.out.println("-- distance --" + key);
for (Iterator<Text> iterator = value.iterator(); iterator.hasNext();) {
Text val = iterator.next();
System.out.println("-- value --" + val);
context.write(key, val);
}
// System.out.println("-- reduce end--");
}
}
Console输出
13/06/04 10:16:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/06/04 10:16:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/06/04 10:16:12 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/06/04 10:16:12 INFO input.FileInputFormat: Total input paths to process : 1
13/06/04 10:16:12 WARN snappy.LoadSnappy: Snappy native library not loaded
13/06/04 10:16:12 INFO mapred.JobClient: Running job: job_local_0001
13/06/04 10:16:12 INFO util.ProcessTree: setsid exited with exit code 0
13/06/04 10:16:12 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1961581
13/06/04 10:16:12 INFO mapred.MapTask: io.sort.mb = 100
13/06/04 10:16:12 INFO mapred.MapTask: data buffer = 79691776/99614720
13/06/04 10:16:12 INFO mapred.MapTask: record buffer = 262144/327680
-- key --0-- value --10 11 12 13 1413/06/04 10:16:12 INFO mapred.MapTask: Starting flush of map output
-- key --15-- value --20 21 22 23 24
-- key --30-- value --10 13 13 12 11
-- key --45-- value --30 31 32 33 34
-- key --60-- value --10 16 17 12 14
-- key --75-- value --40 41 42 43 44
-- key --90-- value --50 51 52 53 54
-- key --105-- value --32 36 38 34 35
-- key --120-- value --22 24 26 23 27
-- key --135-- value --52 54 53 53 58
13/06/04 10:16:12 INFO mapred.MapTask: Finished spill 0
13/06/04 10:16:12 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/06/04 10:16:13 INFO mapred.JobClient: map 0% reduce 0%
13/06/04 10:16:15 INFO mapred.LocalJobRunner:
13/06/04 10:16:15 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/06/04 10:16:15 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@76e8a7
13/06/04 10:16:15 INFO mapred.LocalJobRunner:
13/06/04 10:16:15 INFO mapred.Merger: Merging 1 sorted segments
13/06/04 10:16:15 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 252 bytes
13/06/04 10:16:15 INFO mapred.LocalJobRunner:
-- distance --10.0
-- value --10 13 13 12 11
-- distance --15.0
-- value --10 11 12 13 14
-- distance --72.0
-- value --10 16 17 12 14
-- distance --615.0
-- value --20 21 22 23 24
-- distance --915.0
-- value --22 24 26 23 27
-- distance --2215.0
-- value --30 31 32 33 34
-- distance --2900.0
-- value --32 36 38 34 35
-- distance --4815.0
-- value --40 41 42 43 44
-- distance --8415.0
-- value --50 51 52 53 54
-- distance --9267.0
-- value --52 54 53 53 58
13/06/04 10:16:15 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/06/04 10:16:15 INFO mapred.LocalJobRunner:
13/06/04 10:16:15 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/06/04 10:16:15 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.56.171:9000/ModelMatch/output
13/06/04 10:16:16 INFO mapred.JobClient: map 100% reduce 0%
13/06/04 10:16:18 INFO mapred.LocalJobRunner: reduce > reduce
13/06/04 10:16:18 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/06/04 10:16:19 INFO mapred.JobClient: map 100% reduce 100%
13/06/04 10:16:19 INFO mapred.JobClient: Job complete: job_local_0001
13/06/04 10:16:19 INFO mapred.JobClient: Counters: 22
13/06/04 10:16:19 INFO mapred.JobClient: File Output Format Counters
13/06/04 10:16:19 INFO mapred.JobClient: Bytes Written=212
13/06/04 10:16:19 INFO mapred.JobClient: FileSystemCounters
13/06/04 10:16:19 INFO mapred.JobClient: FILE_BYTES_READ=604
13/06/04 10:16:19 INFO mapred.JobClient: HDFS_BYTES_READ=300
13/06/04 10:16:19 INFO mapred.JobClient: FILE_BYTES_WRITTEN=81710
13/06/04 10:16:19 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=212
13/06/04 10:16:19 INFO mapred.JobClient: File Input Format Counters
13/06/04 10:16:19 INFO mapred.JobClient: Bytes Read=150
13/06/04 10:16:19 INFO mapred.JobClient: Map-Reduce Framework
13/06/04 10:16:19 INFO mapred.JobClient: Map output materialized bytes=256
13/06/04 10:16:19 INFO mapred.JobClient: Map input records=10
13/06/04 10:16:19 INFO mapred.JobClient: Reduce shuffle bytes=0
13/06/04 10:16:19 INFO mapred.JobClient: Spilled Records=20
13/06/04 10:16:19 INFO mapred.JobClient: Map output bytes=230
13/06/04 10:16:19 INFO mapred.JobClient: Total committed heap usage (bytes)=182779904
13/06/04 10:16:19 INFO mapred.JobClient: CPU time spent (ms)=0
13/06/04 10:16:19 INFO mapred.JobClient: SPLIT_RAW_BYTES=119
13/06/04 10:16:19 INFO mapred.JobClient: Combine input records=0
13/06/04 10:16:19 INFO mapred.JobClient: Reduce input records=10
13/06/04 10:16:19 INFO mapred.JobClient: Reduce input groups=10
13/06/04 10:16:19 INFO mapred.JobClient: Combine output records=0
13/06/04 10:16:19 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
13/06/04 10:16:19 INFO mapred.JobClient: Reduce output records=10
13/06/04 10:16:19 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
13/06/04 10:16:19 INFO mapred.JobClient: Map output records=10
最终结果输出
distance vector
10.010 13 13 12 11
15.010 11 12 13 14
72.010 16 17 12 14
615.020 21 22 23 24
915.022 24 26 23 27
2215.030 31 32 33 34
2900.032 36 38 34 35
4815.040 41 42 43 44
8415.050 51 52 53 54
9267.052 54 53 53 58