MapReduce之人脉计算

时间:2022-08-17 16:56:35

需求:
小明 老王 如花 林志玲
老王 小明 凤姐
如花 小明 李刚 凤姐
林志玲 小明 李刚 凤姐 郭美美
李刚 如花 凤姐 林志玲
郭美美 凤姐 林志玲
凤姐 如花 老王 林志玲 郭美美

Step 1:
User.java

    private String user;
private String other;
private int count;
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getOther() {
return other;
}
public void setOther(String other) {
this.other = other;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(user);
out.writeUTF(other);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
this.user=in.readUTF();
this.other=in.readUTF();
this.count=in.readInt();
}
public int compareTo(User o) {
return this == o ?0:-1;
}

Step 2:
Fof.java

public Fof (){

}

public Fof(String a,String b){
super(join(a, b));
}

/**
* zs,ls --> zs \t ls
* ls,zs --> zs \t ls
* @param a
* @param b
* @return
*/

public static String join(String a,String b){
if(StringUtils.isNotBlank(a)&& StringUtils.isNotBlank(b)){
if(a.compareTo(b)<0){
return a+"\t"+b;
}else{
return b+"\t"+a;
}
}
return null;
}

Step 3:
RunJob.java

Mapper——1:

static class Mapper1 extends Mapper<Text, Text, Fof, IntWritable> {
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String user = key.toString();
String[] friends = StringUtils.split(value.toString(), '\t');
for (int i = 0; i < friends.length; i++) {
String f1 = friends[i];
context.write(new Fof(user, f1), new IntWritable(2));
for (int j = i + 1; j < friends.length; j++) {
String f2 = friends[j];
Fof fof = new Fof(f1, f2);
context.write(fof, new IntWritable(1));
}
}
}
}

Reducer——1:

static class Reducer1 extends Reducer<Fof, IntWritable, Fof, IntWritable> {
protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Context arg2)
throws IOException, InterruptedException {
int sum = 0;
boolean f = true;
for (IntWritable t : arg1) {
if (t.get() == 2) {// 可以判断出该组中的fof就是直接好友
f = false;
break;
} else {
sum = sum + t.get();
}
}
if (f) {
arg2.write(arg0, new IntWritable(sum));
}
}
}

Mapper——2:

static class Mapper2 extends Mapper<Text, Text, User, Text> {

protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String user =key.toString();
String other =value.toString().split("\t")[0];
int count =Integer.parseInt(value.toString().split("\t")[1]);
User u1 =new User();
u1.setUser(user);
u1.setOther(other);
u1.setCount(count);
User u2 =new User();
u2.setUser(other);
u2.setOther(user);
u2.setCount(count);
context.write(u1, new Text(other+":"+count));
context.write(u2, new Text(user+":"+count));
}
}

FriendsSort :

    static class FriendsSort extends WritableComparator{

public FriendsSort() {
super(User.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
User u1=(User) a;
User u2=(User) b;
int r =u1.getUser().compareTo(u2.getUser());
if(r==0){
return -Integer.compare(u1.getCount(), u2.getCount());
}
return r;
}
}

FriendsGroup:

    static class FriendsGroup extends WritableComparator{

public FriendsGroup() {
super(User.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
User u1=(User) a;
User u2=(User) b;
return u1.getUser().compareTo(u2.getUser());
}
}

Reducer——2:

static class Reducer2 extends Reducer<User, Text, Text, Text> {
protected void reduce(User arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
StringBuilder sb =new StringBuilder();
// sb.append(arg0.getUser()+"\t");
for(Text t: arg1){
sb.append(t.toString()+",");
}
arg2.write(new Text(arg0.getUser()), new Text(sb.toString()));
}
}

Main:

    public static void main(String[] args) {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node12:8020");
// conf.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//hdfs dfs -chmod 777 /usr/output/ 增加权限
try {
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf, "weather");
job.setJarByClass(RunJob.class);

job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Fof.class);
job.setMapOutputValueClass(IntWritable.class);

// 以第一个隔开符(\t)隔开一行数据,左侧为key,右侧为value
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/usr/input/friend"));

Path output = new Path("/usr/output/friend");
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);

boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("第一个 mr —job 执行成功");
System.out.println("开始执行第二个mapreduce");
job =Job.getInstance(conf, "friend");
job.setJarByClass(RunJob.class);

job.setMapperClass(Mapper2.class);
job.setReducerClass(Reducer2.class);
job.setMapOutputKeyClass(User.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(FriendsSort.class);
job.setGroupingComparatorClass(FriendsGroup.class);

// 以第一个隔开符(\t)隔开一行数据,左侧为key,右侧为value
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/usr/output/friend"));

output = new Path("/usr/output/friend2");
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);

f = job.waitForCompletion(true);
if(f){
System.out.println("第二个mr执行成功!");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}