如何在JAVA中保存从dstream中排序的前N

时间:2023-01-16 20:49:09

I have a sorted dstream which i can print as follows

我有一个有序的dstream,我可以打印如下。

     sorted.foreach(
       new Function<JavaPairRDD<Double,String>, Void>(){
           public Void call(JavaPairRDD<Double, String> rdd){
               String out = "\n Top Values: \n";           
               for (Tuple2<Double, String> t: rdd.take(10)){
                   out = out + t.toString() + "\n";                    
               }
               System.out.println(out);
               return null;
           }});

However, I would like to save this to a text file instead of just printing out the 10 values. *PLEASE NOTE, I WANT TO SAVE TO TEXT FILE JUST THE TOP 10 VALUES, not the entire dstream

但是,我希望将它保存到一个文本文件中,而不是仅仅打印出10个值。*请注意,我只想把前10个值保存到文本文件中,而不是整个dstream

I'll appreciate any help. Also I am coding in Java, not scala.

我要感谢任何帮助。我也在用Java编写代码,而不是scala。

2 个解决方案

#1


1  

Assuming that your input is sorted & done in scala:

假设你的输入是用scala排序和完成的:

val location = "hdfs://..."
val target = 10
sorted.foreachRDD({rdd, time =>
    // Determine how many elements preceded each partition.
    val partitionElemCounts = rdd.mapPartitions(items => 
      List(items.size)).collect().scanLeft(0) { case (sum,e) => sum+e}
    // Get the number of elements in each partition we need
    val nRdd = rdd.mapPartitionsWithIndex { items, partition =>
         items.take(max(0, target-partitionElemCounts(partition)))
    }
    // we append the time to the path so each segment is written out to a different directory
    val out = location + time
    nRdd.saveAsTextFile(out)
  }
});

#2


0  

you can do it like this.

你可以这样做。

object DStreamTopN {

def main(args: Array[String]) {

def主要(args:数组(String)){

StreamingExamples.setStreamingLogLevels()

val sparkConf = new SparkConf().setAppName("DStreamTopN").setMaster("local[3]")
val ssc = new StreamingContext(sparkConf, Seconds(5))

ssc.checkpoint("/tmp/checkpoint")

val lines = ssc.receiverStream(new UdpReceiver(1514, "UTF-8"))

val wc = lines.flatMap(_.split(" ")).map(_ -> 1).reduceByKey(_ + _)

val sort = wc.transform((rdd: RDD[(String, Int)]) => {
  val topN = rdd.sortBy(_._2, false).take(3)
  rdd.sparkContext.makeRDD(topN)
})

sort.foreachRDD(_.foreach(println))

ssc.start()
ssc.awaitTermination()

} }

} }

#1


1  

Assuming that your input is sorted & done in scala:

假设你的输入是用scala排序和完成的:

val location = "hdfs://..."
val target = 10
sorted.foreachRDD({rdd, time =>
    // Determine how many elements preceded each partition.
    val partitionElemCounts = rdd.mapPartitions(items => 
      List(items.size)).collect().scanLeft(0) { case (sum,e) => sum+e}
    // Get the number of elements in each partition we need
    val nRdd = rdd.mapPartitionsWithIndex { items, partition =>
         items.take(max(0, target-partitionElemCounts(partition)))
    }
    // we append the time to the path so each segment is written out to a different directory
    val out = location + time
    nRdd.saveAsTextFile(out)
  }
});

#2


0  

you can do it like this.

你可以这样做。

object DStreamTopN {

def main(args: Array[String]) {

def主要(args:数组(String)){

StreamingExamples.setStreamingLogLevels()

val sparkConf = new SparkConf().setAppName("DStreamTopN").setMaster("local[3]")
val ssc = new StreamingContext(sparkConf, Seconds(5))

ssc.checkpoint("/tmp/checkpoint")

val lines = ssc.receiverStream(new UdpReceiver(1514, "UTF-8"))

val wc = lines.flatMap(_.split(" ")).map(_ -> 1).reduceByKey(_ + _)

val sort = wc.transform((rdd: RDD[(String, Int)]) => {
  val topN = rdd.sortBy(_._2, false).take(3)
  rdd.sparkContext.makeRDD(topN)
})

sort.foreachRDD(_.foreach(println))

ssc.start()
ssc.awaitTermination()

} }

} }