I have two pair RDDs let say
我说有两对RDD
RDD1 : [(1,a),(2,b),(3,c)]
RDD2 : [(1,d),(2,e),(3,f)]
Now am joining these RDDs using join
现在我正在使用join加入这些RDD
RDD3 = RDD1.join(RDD2);
And i have displayed the elements in RDD3 with below code
我已经在RDD3中显示了以下代码中的元素
for(Tuple2<Integer,Tuple2<String,String>> tuple : RDD3.collect())
System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());
i have seen weird results like
我见过奇怪的结果
5:b,e
4:a,d
6:c,f
where as i expected like
像我期望的那样
1:a,d
1:b,e
1:c,f
Is there any way to get a desired output like the above ? or else am interpreting RDD behavior wrongly ? Please suggest
有没有办法像上面那样获得所需的输出?或者错误地解释RDD行为?请建议
Edit :
Actually i am reading data like this
其实我正在读这样的数据
JavaDoubleRDD data1 = sc.parallelizeDoubles(Arrays.asList(45.25,22.15,33.24));
JavaDoubleRDD data2 = sc.parallelizeDoubles(Arrays.asList(23.45,19.35,12.45));
and then
JavaPairRDD<Double,Double> lat1 = data1.cartesian(data1);
JavaRDD<Double> lat2 = lat1.map(new Function<Tuple2<Double,Double>,Double>() {
@Override
public Double call(Tuple2<Double,Double> t) {
return Math.pow(t._1()-t._2(),2);
}
});
//flag and flag1 are static variables initially equal to 1
JavaPairRDD<Integer,Double> lat3 = lat2.mapToPair(new PairFunction<Double,Integer,Double>() {
@Override
public Tuple2<Integer,Double> call(Double d) {
return new Tuple2<Integer,Double>(flag++,d);
}
});
System.out.println("Latitude values display");
for(Tuple2<?,?> tuple : lat3.collect()) {
System.out.println(tuple._1()+":"+tuple._2());
}
JavaPairRDD<Double,Double> long1 = data2.cartesian(data2);
JavaRDD<Double> long2 = long1.map(new Function<Tuple2<Double,Double>,Double>() {
@Override
public Double call(Tuple2<Double,Double> t) {
return Math.pow(t._1()-t._2(),2);
}
});
JavaPairRDD<Integer,Double> long3 = long2.mapToPair(new PairFunction<Double,Integer,Double>() {
@Override
public Tuple2<Integer,Double> call(Double d ) {
return new Tuple2<Integer,Double>(flag1++,d);
}
});
System.out.println("Longitude values display");
for(Tuple2<?,?> tuple : long3.collect()) {
System.out.println(tuple._1()+":"+tuple._2());
}
System.out.println("latitude and longitude values join");
JavaPairRDD<Integer,Tuple2<Double,Double>> weightmatrix1 = lat3.join(long3);
System.out.println("Weightmatrix1 Display");
for(Tuple2<?,Tuple2<?,?>> tuple : weightmatrix1.collect()) {
System.out.println(tuple._1()+":"+tuple._2()._1()+","+tuple._2()._2());
}
So what am doing is calculating weight matrix based on latitude and longitude values
那么我们正在做的是根据纬度和经度值计算权重矩阵
2 个解决方案
#1
When I do:
当我做:
scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
scala> val rdd2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
scala> val rdd3 = rdd1.join(rdd2)
scala> rdd3.toArray.foreach(println(_))
I consistently get:
我一直得到:
(1,(a,d))
(2,(b,e))
(3,(c,f))
#2
That's what I tried with expected results:
这就是我尝试的预期结果:
val data1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val data2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
val data3 = data1.join(data2)
data3.collect().map(tuple => tuple._1 + ":"+tuple._2._1+","+tuple._2._2).foreach(println(_))
Obtaining:
1:a,d
2:b,e
3:c,f
So this is scala. I suppose in Java should be the same output.
所以这是scala。我想在Java中应该是相同的输出。
#1
When I do:
当我做:
scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
scala> val rdd2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
scala> val rdd3 = rdd1.join(rdd2)
scala> rdd3.toArray.foreach(println(_))
I consistently get:
我一直得到:
(1,(a,d))
(2,(b,e))
(3,(c,f))
#2
That's what I tried with expected results:
这就是我尝试的预期结果:
val data1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val data2 = sc.parallelize(Array((1,"d"),(2,"e"),(3,"f")))
val data3 = data1.join(data2)
data3.collect().map(tuple => tuple._1 + ":"+tuple._2._1+","+tuple._2._2).foreach(println(_))
Obtaining:
1:a,d
2:b,e
3:c,f
So this is scala. I suppose in Java should be the same output.
所以这是scala。我想在Java中应该是相同的输出。