结合MapReduce和数据集Combining datasets with MapReduce

时间:2023-03-10 01:14:23
结合MapReduce和数据集Combining datasets with MapReduce

While in the SQL-world is very easy combining two or more datasets - we just need to use the JOIN keyword - with MapReduce things becomes a little harder. Let's get into it. 
Suppose we have two distinct datasets, one for users of a forum and the other for the posts in the forum (data is in TSV - Tab Separated Values - format).
Users dataset:

id   name  reputation
0102 alice 32
0511 bob 27
...

Posts dataset:

id      type      subject   body                                   userid
0028391 question test "Hi, what is.." 0102
0073626 comment bug "Guys, I've found.." 0511
0089234 comment bug "Nope, it's not that way.." 0734
0190347 answer info "In my opinion it's worth the time.." 1932
...

What we'd like to do is to combine the reputation of each user to the number of question he/she posted, to see if we can relate one to the other.

The main idea behind combining the two datasets is to leverage the shuffle and sort phase: this process groups together values with the same key, so if we define the user id as the key, we can send to the reducer both the user reputation and the number of his/her posts, because they're attached to the same key (the user id). 
Let's see how. 
We start with the mapper:

public static class JoinMapper extends Mapper<object, text,="" intwritable=""> {

        private final static IntWritable one = new IntWritable(1);

        @Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // gets filename of the input file for this record
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String filename = fileSplit.getPath().getName(); // creates an array with all the fields of the row we're reading now
String[] fields = value.toString().split(("\t")); // if we're reading the posts file
if (filename.equals("forum_nodes_no_lf.tsv")) {
// retrieves the author ID and the type of the post
String type = fields[1];
if (type.equals("question")) {
String authorId = fields[4];
context.write(new Text(authorId), one);
}
}
// if we're reading the users file
else {
String authorId = fields[0];
String reputation = fields[2]; // we add two to the reputation, because we want the minimum value to be greater than 1,
// not to be confused with the "one" passed by the other branch of the if
int reputationValue = Integer.parseInt(reputation) + 2;
context.write(new Text(authorId), new IntWritable(reputationValue));
}
}
}

First of all, this code assumes that in the directory Hadoop in looking in for data, there are two files: the users file and the posts file; we use the FileSplit class to obtain which filename Hadoop is now reading: in this way we can know if we're dealing the users file or the posts file. Then, if is the posts file, things get a little trickier. For every user, we're passing to the reducer a "1" for every question he/she posted on the forum; since we want to pass also reputation of the user (that can be a "0" or a "1"), we have to be careful not to mix up the values. To do this, we add 2 to the reputation, so that, even if it is "0", the value passed to the reducer will be greater or equal to two. In this way, we know that when the reducer will receive a "1" it will be for counting a question posted on the forum, while when it will receive a value greater than "1", it will be the reputation of the user. 
Let's now look at the reducer:

 public static class JoinReducer extends Reducer<text, intwritable,="" text,="" text=""> {

        @Override
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int postsNumber = 0;
int reputation = 0;
String authorId = key.toString(); for (IntWritable value : values) { int intValue = value.get();
if (intValue == 1) {
postsNumber ++;
}
else {
// we subtract two for having the exact reputation value (see the mapper)
reputation = intValue -2;
}
} context.write(new Text(authorId), new Text(reputation + "\t" + postsNumber));
}
}

As stated before, the reducer will now receive two kinds of data: "1" if related to the number of posts of the user, and a value greater than one for the reputation. The code in in the reducer, checks exactly this: if receives a "1" increaes the number of posts of this user, otherwise sets his/her reputation. At the end of the method, we tell the reducer to output the authorId, his/her reputation and how many posts has posted on the forum:

userid  reputation  posts#
0102 55 23
0511 05 11
0734 00 89
1932 19 32
...

and we're ready to analyze these data.

from: http://andreaiacono.blogspot.com/2014/09/combining-datasets-with-mapreduce.html