共同好友:求大量集合的两两交集 目标:令U为包含所有用户的一个集合:{U1,U2,...,Un},我们的目标是为每个(Ui,Uj)对(i!=j)找出共同好友。 前提:好友关系是双向的 输入:<person><,><friend1>< ><friend2>< >...<friendN> 100,200 300 400 500 600 200,100 300 400 300,100 200 400 500 400,100 200 300 500,100,300
600,100
把每个人的好友,两两配对形成新的集合 比如第一行进行map以后为
(100,((200,300),(200,400),(200,500),(200,600),(300,400),
(300,500),(300,600),(400,500),(400,500),(500,600))
然后顺着这个思路用flatMapValues 将所有的values拆分,然后进行reduceByKey 可以得出每一对的共同好友
值得注意的事,这里每一组的好友都是按照数字顺序,所以分组不需要对其排序,实际情况下可以先对每个人的好友
进行排序,避免重复
以下是实现代码
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ArrayBuffer object CommonFriends { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("Friends").setMaster("local") val sc =new SparkContext(conf) //读取文件 val rddFile=sc.textFile("/input/friends.txt") //按行用逗号切分 k,v val rddLine=rddFile.map(_.split(",")) //让rddLine存入缓存 rddLine.cache() //初始化一个空共同好友集合,将所有人两两配对 val rddEmpty=rddLine.map(x=>("",x(0))).groupByKey().map(x=>{ val person=x._1 var paris:Array[Array[String]]=Array() val friends= x._2.toArray if(friends.length>=2){ for (i<- 0 until friends.length-1){ for(j<-i+1 until friends.length){ val array=Array(friends(i),friends(j)) paris=paris.++(array::Nil) } } } (person,paris) }) //计算有共同好友的好友对 val rddSplit=rddLine.map(arr=>{ //创建好友对数组 var paris:Array[Array[String]]=Array() val person=arr(0) //将v 用空格切分为数组 val friends=arr(1).split(" ") //两两配对 if(friends.length>=2){ for (i<- 0 until friends.length-1){ for(j<-i+1 until friends.length){ val array=Array(friends(i),friends(j)) paris=paris.++(array::Nil) } } } (person,paris) }) //将两个rdd关联,如果没有共同好友会显示空 val rddGroups=rddSplit.union(rddEmpty) //对paris 进行拆分最为key ,然后聚合 val rddResult=rddGroups.flatMapValues(value=>value.map(x=>x)).map(tuple=>((tuple._2(0),tuple._2(1)),tuple._1)).reduceByKey(_+" "+_) rddResult.saveAsTextFile("/output13") sc.stop() } }
运行结果